Core Data Model
Assets
Retrieve an asset by id
- AssetsAPI.retrieve(id: int | None = None, external_id: str | None = None) Asset | None
Retrieve a single asset by id.
- Parameters
id (int | None) – ID
external_id (str | None) – External ID
- Returns
Requested asset or None if it does not exist.
- Return type
Asset | None
Examples
Get asset by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.retrieve(id=1)
Get asset by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.retrieve(external_id="1")
Retrieve multiple assets by id
- AssetsAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) AssetList
Retrieve multiple assets by id.
- Parameters
ids (Sequence[int] | None) – IDs
external_ids (Sequence[str] | None) – External IDs
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
The requested assets.
- Return type
Examples
Get assets by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.retrieve_multiple(ids=[1, 2, 3])
Get assets by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.retrieve_multiple(external_ids=["abc", "def"], ignore_unknown_ids=True)
Retrieve an asset subtree
- AssetsAPI.retrieve_subtree(id: int | None = None, external_id: str | None = None, depth: int | None = None) AssetList
Retrieve the subtree for this asset up to a specified depth.
- Parameters
id (int | None) – Id of the root asset in the subtree.
external_id (str | None) – External id of the root asset in the subtree.
depth (int | None) – Retrieve assets up to this depth below the root asset in the subtree. Omit to get the entire subtree.
- Returns
The requested assets or empty AssetList if asset does not exist.
- Return type
List assets
- AssetsAPI.list(name: str | None = None, parent_ids: Sequence[int] | None = None, parent_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None, metadata: dict[str, str] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, root: bool | None = None, external_id_prefix: str | None = None, aggregated_properties: Sequence[str] | None = None, partitions: int | None = None, limit: int | None = 25) AssetList
-
- Parameters
name (str | None) – Name of asset. Often referred to as tag.
parent_ids (Sequence[int] | None) – Return only the direct descendants of the specified assets.
parent_external_ids (Sequence[str] | None) – Return only the direct descendants of the specified assets.
asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.
asset_subtree_external_ids (str | Sequence[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.
data_set_ids (int | Sequence[int] | None) – Return only assets in the specified data set(s) with this id / these ids.
data_set_external_ids (str | Sequence[str] | None) – Return only assets in the specified data set(s) with this external id / these external ids.
labels (LabelFilter | None) – Return only the assets matching the specified label filter.
geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value.
source (str | None) – The source of this asset.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
root (bool | None) – filtered assets are root assets or not.
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
aggregated_properties (Sequence[str] | None) – Set of aggregated properties to include.
partitions (int | None) – Retrieve assets in parallel using this number of workers. Also requires limit=None to be passed. To prevent unexpected problems and maximize read throughput, API documentation recommends at most use 10 partitions. When using more than 10 partitions, actual throughout decreases. In future releases of the APIs, CDF may reject requests with more than 10 partitions.
limit (int | None) – Maximum number of assets to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of requested assets
- Return type
Examples
List assets:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> asset_list = c.assets.list(limit=5)
Iterate over assets:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for asset in c.assets: ... asset # do something with the asset
Iterate over chunks of assets to reduce memory load:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for asset_list in c.assets(chunk_size=2500): ... asset_list # do something with the assets
Filter assets based on labels:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import LabelFilter >>> c = CogniteClient() >>> my_label_filter = LabelFilter(contains_all=["PUMP", "VERIFIED"]) >>> asset_list = c.assets.list(labels=my_label_filter)
Aggregate assets
- AssetsAPI.aggregate(filter: AssetFilter | dict | None = None) list[AssetAggregate]
-
- Parameters
filter (AssetFilter | dict | None) – Filter on assets with strict matching.
- Returns
List of asset aggregates
- Return type
list[AssetAggregate]
Examples
Aggregate assets:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> aggregate_by_prefix = c.assets.aggregate(filter={"external_id_prefix": "prefix"})
Aggregate asset metadata keys
- AssetsAPI.aggregate_metadata_keys(filter: AssetFilter | dict | None = None) Sequence[AggregateBucketResult]
-
Note
In the case of text fields, the values are aggregated in a case-insensitive manner
- Parameters
filter (AssetFilter | dict | None) – Filter on assets with strict matching.
- Returns
List of asset aggregates
- Return type
Sequence[AggregateBucketResult]
Examples
Aggregate assets:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> aggregate_by_prefix = c.assets.aggregate_metadata_keys(filter={"external_id_prefix": "prefix"})
Aggregate asset metadata values
- AssetsAPI.aggregate_metadata_values(keys: Sequence[str], filter: AssetFilter | dict | None = None) Sequence[AggregateBucketResult]
-
Note
In the case of text fields, the values are aggregated in a case-insensitive manner
- Parameters
keys (Sequence[str]) – Metadata key(s) to apply the aggregation on. Currently supports exactly one key per request.
filter (AssetFilter | dict | None) – Filter on assets with strict matching.
- Returns
List of asset aggregates
- Return type
Sequence[AggregateBucketResult]
Examples
Aggregate assets:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> aggregate_by_prefix = c.assets.aggregate_metadata_values( ... keys=["someKey"], ... filter={"external_id_prefix": "prefix"} ... )
Aggregate Asset Count
- AssetsAPI.aggregate_count(property: AssetPropertyLike | None = None, advanced_filter: Filter | dict | None = None, filter: AssetFilter | dict | None = None) int
Count of assets matching the specified filters.
- Parameters
property (AssetPropertyLike | None) – If specified, get an approximate number of asset with a specific property (property is not null) and matching the filters.
advanced_filter (Filter | dict | None) – The advanced filter to narrow down the assets to count.
filter (AssetFilter | dict | None) – The filter to narrow down the assets to count (strict matching).
- Returns
The number of assets matching the specified filters.
- Return type
int
Examples:
Count the number of assets in your CDF project:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> count = c.assets.aggregate_count()
Count the number of assets with the metadata key “timezone” in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.assets import AssetProperty >>> c = CogniteClient() >>> has_timezone = filters.ContainsAny(AssetProperty.metadata, "timezone") >>> asset_count = c.assets.aggregate_count(advanced_filter=has_timezone)
Aggregate Asset Value Cardinality
- AssetsAPI.aggregate_cardinality_values(property: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) int
Find approximate property count for assets.
- Parameters
property (AssetPropertyLike) – The property to count the cardinality of.
advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).
- Returns
The number of properties matching the specified filters and search.
- Return type
int
Examples
Count the number of labels used by assets in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.assets import AssetProperty >>> c = CogniteClient() >>> label_count = c.assets.aggregate_cardinality_values(AssetProperty.labels)
Count the number of timezones (metadata key) for assets with the word “critical” in the description in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.assets import AssetProperty >>> c = CogniteClient() >>> is_critical = filters.Search(AssetProperty.description, "critical") >>> critical_assets = c.assets.aggregate_cardinality_values(AssetProperty.metadata_key("timezone"), advanced_filter=is_critical)
Aggregate Asset Property Cardinality
- AssetsAPI.aggregate_cardinality_properties(path: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) int
Find approximate paths count for assets.
- Parameters
path (AssetPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).
- Returns
The number of properties matching the specified filters.
- Return type
int
Examples
Count the number of unique metadata keys used by assets in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.assets import AssetProperty >>> c = CogniteClient() >>> key_count = c.assets.aggregate_cardinality_properties(AssetProperty.metadata)
Aggregate Asset Unique Values
- AssetsAPI.aggregate_unique_values(property: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) UniqueResultList
Get unique properties with counts for assets.
- Parameters
property (AssetPropertyLike) – The property to group by.
advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).
- Returns
List of unique values of assets matching the specified filters and search.
- Return type
UniqueResultList
Examples:
Get the timezones (metadata key) with count for your assets in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.assets import AssetProperty >>> c = CogniteClient() >>> result = c.assets.aggregate_unique_values(AssetProperty.metadata_key("timezone")) >>> print(result.unique)
Get the different labels with count used for assets created after 2020-01-01 in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.assets import AssetProperty >>> from cognite.client.utils import timestamp_to_ms >>> from datetime import datetime >>> c = CogniteClient() >>> created_after_2020 = filters.Range(AssetProperty.created_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.assets.aggregate_unique_values(AssetProperty.labels, advanced_filter=created_after_2020) >>> print(result.unique)
Get the different labels with count for assets updated after 2020-01-01 in your CDF project, but exclude all labels that starts with “test”:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.assets import AssetProperty >>> from cognite.client.data_classes import aggregations as aggs, filters >>> c = CogniteClient() >>> not_test = aggs.Not(aggs.Prefix("test")) >>> created_after_2020 = filters.Range(AssetProperty.last_updated_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.assets.aggregate_unique_values(AssetProperty.labels, advanced_filter=created_after_2020, aggregate_filter=not_test) >>> print(result.unique)
Aggregate Asset Unique Properties
- AssetsAPI.aggregate_unique_properties(path: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) UniqueResultList
Get unique paths with counts for assets.
- Parameters
path (AssetPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).
- Returns
List of unique values of assets matching the specified filters and search.
- Return type
UniqueResultList
Examples
Get the metadata keys with counts for your assets in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.assets import AssetProperty >>> c = CogniteClient() >>> result = c.assets.aggregate_unique_properties(AssetProperty.metadata)
Search for assets
- AssetsAPI.search(name: str | None = None, description: str | None = None, query: str | None = None, filter: AssetFilter | dict | None = None, limit: int = 25) AssetList
Search for assets Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.
- Parameters
name (str | None) – Fuzzy match on name.
description (str | None) – Fuzzy match on description.
query (str | None) – Whitespace-separated terms to search for in assets. Does a best-effort fuzzy search in relevant fields (currently name and description) for variations of any of the search terms, and orders results by relevance.
filter (AssetFilter | dict | None) – Filter to apply. Performs exact match on these fields.
limit (int) – Maximum number of results to return.
- Returns
List of requested assets
- Return type
Examples
Search for assets by fuzzy search on name:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.search(name="some name")
Search for assets by exact search on name:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.search(filter={"name": "some name"})
Search for assets by improved multi-field fuzzy search:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.search(query="TAG 30 XV")
Search for assets using multiple filters, finding all assets with name similar to xyz with parent asset 123 or 456 with source some source:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.assets.search(name="xyz",filter={"parent_ids": [123,456],"source": "some source"})
Search for an asset with an attached label:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_label_filter = LabelFilter(contains_all=["PUMP"]) >>> res = c.assets.search(name="xyz",filter=AssetFilter(labels=my_label_filter))
Create assets
- AssetsAPI.create(asset: Sequence[Asset]) AssetList
- AssetsAPI.create(asset: Asset) Asset
-
You can create an arbitrary number of assets, and the SDK will split the request into multiple requests. When specifying parent-child relation between assets using parentExternalId the link will be resvoled into an internal ID and stored as parentId.
- Parameters
asset (Asset | Sequence[Asset]) – Asset or list of assets to create.
- Returns
Created asset(s)
- Return type
Examples
Create new assets:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Asset >>> c = CogniteClient() >>> assets = [Asset(name="asset1"), Asset(name="asset2")] >>> res = c.assets.create(assets)
Create asset with label:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Asset, Label >>> c = CogniteClient() >>> asset = Asset(name="my_pump", labels=[Label(external_id="PUMP")]) >>> res = c.assets.create(asset)
Create asset hierarchy
- AssetsAPI.create_hierarchy(assets: Sequence[Asset] | AssetHierarchy, *, upsert: bool = False, upsert_mode: Literal['patch', 'replace'] = 'patch') AssetList
Create an asset hierarchy with validation.
This helper function makes it easy to insert large asset hierarchies. It solves the problem of topological insertion order, i.e. a parent asset must exist before it can be referenced by any ‘children’ assets. You may pass any number of partial- or full hierarchies: there are no requirements on the number of root assets, so you may pass zero, one or many (same goes for the non-root assets).
- Parameters
assets (Sequence[Asset] | AssetHierarchy) – List of assets to create or an instance of AssetHierarchy.
upsert (bool) – If used, already existing assets will be updated instead of an exception being raised. You may control how updates are applied with the ‘upsert_mode’ argument.
upsert_mode (Literal["patch", "replace"]) – Only applicable with upsert. Pass ‘patch’ to only update fields with non-null values (default), or ‘replace’ to do full updates (unset fields become null or empty).
- Returns
Created (and possibly updated) asset hierarchy
- Return type
Prior to insertion, this function will run validation on the given assets and raise an error if any of the following issues are found:
Any assets are invalid (category:
invalid
):Missing external ID.
Missing a valid name.
Has an ID set.
Any asset duplicates exist (category:
duplicates
)Any assets have an ambiguous parent link (category:
unsure_parents
)Any group of assets form a cycle, e.g. A->B->A (category:
cycles
)
As part of validation there is a fifth category that is ignored when using this method (for backwards compatibility) and that is orphan assets. These are assets linking a parent by an identifier that is not present among the given assets, and as such, might contain links we are unable to vet ahead of insertion. These are thus assumed to be valid, but may fail.
Tip
The different categories specified above corresponds to the name of the attribute you might access on the raised error to get the collection of ‘bad’ assets falling in that group, e.g.
error.duplicates
.Note
Updating
external_id
via upsert is not supported (and will not be supported). UseAssetsAPI.update
instead.Warning
The API does not natively support upsert, so the SDK has to simulate the behaviour at the cost of some insertion speed.
Be careful when moving assets to new parents via upsert: Please do so only by specifying
parent_external_id
(instead ofparent_id
) to avoid race conditions in insertion order (temporary cycles might form since we can only make changes to 1000 assets at the time).Examples
Create an asset hierarchy:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Asset >>> c = CogniteClient() >>> assets = [ ... Asset(external_id="root", name="root"), ... Asset(external_id="child1", parent_external_id="root", name="child1"), ... Asset(external_id="child2", parent_external_id="root", name="child2")] >>> res = c.assets.create_hierarchy(assets)
Create an asset hierarchy, but run update for existing assets:
>>> res = c.assets.create_hierarchy(assets, upsert=True, upsert_mode="patch")
Patch will only update the parameters you have defined on your assets. Note that specifically setting something to
None
is the same as not setting it. Formetadata
, this will extend your existing data, only overwriting when keys overlap. Forlabels
the behaviour is mostly the same, existing are left untouched, and your new ones are simply added.You may also pass
upsert_mode="replace"
to make sure the updated assets look identical to the ones you passed to the method. For bothmetadata
andlabels
this will clear out all existing, before (potentially) adding the new ones.If the hierarchy validation for some reason fail, you may inspect all the issues that were found by catching
CogniteAssetHierarchyError
:>>> from cognite.client.exceptions import CogniteAssetHierarchyError >>> try: ... res = c.assets.create_hierarchy(assets) ... except CogniteAssetHierarchyError as err: ... if err.invalid: ... ... # do something
In addition to
invalid
, you may inspectduplicates
,unsure_parents
,orphans
andcycles
. Note that cycles are not available if any of the other basic issues exist, as the search for cyclical references requires a clean asset hierarchy to begin with.You may also wrap the
create_hierarchy()
call in a try-except to get information if any of the assets fails to be created (assuming a valid hierarchy):>>> from cognite.client.exceptions import CogniteAPIError >>> try: ... c.assets.create_hierarchy(assets) ... except CogniteAPIError as err: ... created = err.successful ... maybe_created = err.unknown ... not_created = err.failed
Here’s a slightly longer explanation of the different groups:
err.successful
: Which assets were created (request yielded a 201)err.unknown
: Which assets may have been created (request yielded 5xx)err.failed
: Which assets were not created (request yielded 4xx, or was a descendant of an asset with unknown status)
The preferred way to create an asset hierarchy, is to run validation prior to insertion. You may do this by using the
AssetHierarchy
class. It will by default consider orphan assets to be problematic (but accepts the boolean parameterignore_orphans
), contrary to howcreate_hierarchy
works (which accepts them in order to be backwards-compatible). It also provides helpful methods to create reports of any issues found, check outvalidate_and_report
:>>> from cognite.client.data_classes import AssetHierarchy >>> from pathlib import Path >>> hierarchy = AssetHierarchy(assets) >>> if hierarchy.is_valid(): ... res = c.assets.create_hierarchy(hierarchy) ... else: ... hierarchy.validate_and_report(output_file=Path("report.txt"))
Delete assets
- AssetsAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, recursive: bool = False, ignore_unknown_ids: bool = False) None
-
- Parameters
id (int | Sequence[int] | None) – Id or list of ids
external_id (str | Sequence[str] | None) – External ID or list of external ids
recursive (bool) – Recursively delete whole asset subtrees under given ids. Defaults to False.
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
Examples
Delete assets by id or external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.assets.delete(id=[1,2,3], external_id="3")
Filter assets
- AssetsAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, aggregated_properties: Sequence[Literal['child_count', 'path', 'depth']] | None = None, limit: int | None = 25) AssetList
-
Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.
- Parameters
filter (Filter | dict) – Filter to apply.
sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.
aggregated_properties (Sequence[Literal["child_count", "path", "depth"]] | None) – Set of aggregated properties to include. Options are childCount, path, depth.
limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of assets that match the filter criteria.
- Return type
Examples
Find all assets that have a metadata key ‘timezone’ starting with ‘Europe’, and sort by external id ascending:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> c = CogniteClient() >>> f = filters >>> in_timezone = f.Prefix(["metadata", "timezone"], "Europe") >>> res = c.assets.filter(filter=in_timezone, ... sort=("external_id", "asc"))
Note that you can check the API documentation above to see which properties you can filter on with which filters.
To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the AssetProperty and SortableAssetProperty Enums.
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.assets import AssetProperty, SortableAssetProperty >>> c = CogniteClient() >>> f = filters >>> in_timezone = f.Prefix(AssetProperty.metadata_key("timezone"), "Europe") >>> res = c.assets.filter(filter=in_timezone, ... sort=(SortableAssetProperty.external_id, "asc"))
Update assets
- AssetsAPI.update(item: Sequence[Asset | AssetUpdate]) AssetList
- AssetsAPI.update(item: Asset | AssetUpdate) Asset
Update one or more assets Labels can be added, removed or replaced (set). Note that set operation deletes all the existing labels and adds the new specified labels.
- Parameters
item (Asset | AssetUpdate | Sequence[Asset | AssetUpdate]) – Asset(s) to update
- Returns
Updated asset(s)
- Return type
Examples
Perform a partial update on an asset, updating the description and adding a new field to metadata:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import AssetUpdate >>> c = CogniteClient() >>> my_update = AssetUpdate(id=1).description.set("New description").metadata.add({"key": "value"}) >>> res1 = c.assets.update(my_update) >>> # Remove an already set field like so >>> another_update = AssetUpdate(id=1).description.set(None) >>> res2 = c.assets.update(another_update)
Remove the metadata on an asset:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import AssetUpdate >>> c = CogniteClient() >>> my_update = AssetUpdate(id=1).metadata.add({"key": "value"}) >>> res1 = c.assets.update(my_update) >>> another_update = AssetUpdate(id=1).metadata.set(None) >>> # The same result can be achieved with: >>> another_update2 = AssetUpdate(id=1).metadata.set({}) >>> res2 = c.assets.update(another_update)
Attach labels to an asset:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import AssetUpdate >>> c = CogniteClient() >>> my_update = AssetUpdate(id=1).labels.add(["PUMP", "VERIFIED"]) >>> res = c.assets.update(my_update)
Detach a single label from an asset:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import AssetUpdate >>> c = CogniteClient() >>> my_update = AssetUpdate(id=1).labels.remove("PUMP") >>> res = c.assets.update(my_update)
Replace all labels for an asset:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import AssetUpdate >>> c = CogniteClient() >>> my_update = AssetUpdate(id=1).labels.set("PUMP") >>> res = c.assets.update(my_update)
Upsert assets
- AssetsAPI.upsert(item: Sequence[Asset], mode: Literal['patch', 'replace'] = 'patch') AssetList
- AssetsAPI.upsert(item: Asset, mode: Literal['patch', 'replace'] = 'patch') Asset
- Upsert assets, i.e., update if it exists, and create if it does not exist.
Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.
For more details, see Upsert.
- Parameters
item (Asset | Sequence[Asset]) – Asset or list of assets to upsert.
mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the assets are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified.
- Returns
The upserted asset(s).
- Return type
Examples
Upsert for assets:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Asset >>> c = CogniteClient() >>> existing_asset = c.assets.retrieve(id=1) >>> existing_asset.description = "New description" >>> new_asset = Asset(external_id="new_asset", description="New asset") >>> res = c.assets.upsert([existing_asset, new_asset], mode="replace")
Asset Data classes
- class cognite.client.data_classes.assets.AggregateResultItem(child_count: int | None = None, depth: int | None = None, path: list[dict[str, Any]] | None = None, **kwargs: Any)
Bases:
dict
Aggregated metrics of the asset
- Parameters
child_count (int | None) – Number of direct descendants for the asset
depth (int | None) – Asset path depth (number of levels below root node).
path (list[dict[str, Any]] | None) – IDs of assets on the path to the asset.
**kwargs (Any) – No description.
- class cognite.client.data_classes.assets.Asset(external_id: str | None = None, name: str | None = None, parent_id: int | None = None, parent_external_id: str | None = None, description: str | None = None, data_set_id: int | None = None, metadata: dict[str, str] | None = None, source: str | None = None, labels: list[Label | str | LabelDefinition | dict] | None = None, geo_location: GeoLocation | None = None, id: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, root_id: int | None = None, aggregates: dict[str, Any] | AggregateResultItem | None = None, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
A representation of a physical asset, for example a factory or a piece of equipment.
- Parameters
external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.
name (str | None) – The name of the asset.
parent_id (int | None) – The parent of the node, null if it is the root node.
parent_external_id (str | None) – The external ID of the parent. The property is omitted if the asset doesn’t have a parent or if the parent doesn’t have externalId.
description (str | None) – The description of the asset.
data_set_id (int | None) – The id of the dataset this asset belongs to.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.
source (str | None) – The source of the asset.
labels (list[Label | str | LabelDefinition | dict] | None) – A list of the labels associated with this resource item.
geo_location (GeoLocation | None) – The geographic metadata of the asset.
id (int | None) – A server-generated ID for the object.
created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
root_id (int | None) – ID of the root asset.
aggregates (dict[str, Any] | AggregateResultItem | None) – Aggregated metrics of the asset
cognite_client (CogniteClient | None) – The client to associate with this object.
- dump(camel_case: bool = False) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- events(**kwargs: Any) EventList
Retrieve all events related to this asset.
- Parameters
**kwargs (Any) – All extra keyword arguments are passed to events/list. NB: ‘asset_ids’ can’t be used.
- Returns
All events related to this asset.
- Return type
- files(**kwargs: Any) FileMetadataList
Retrieve all files metadata related to this asset.
- Parameters
**kwargs (Any) – All extra keyword arguments are passed to files/list. NB: ‘asset_ids’ can’t be used.
- Returns
Metadata about all files related to this asset.
- Return type
- sequences(**kwargs: Any) SequenceList
Retrieve all sequences related to this asset.
- Parameters
**kwargs (Any) – All extra keyword arguments are passed to sequences/list. NB: ‘asset_ids’ can’t be used.
- Returns
All sequences related to this asset.
- Return type
- subtree(depth: int | None = None) AssetList
Returns the subtree of this asset up to a specified depth.
- Parameters
depth (int | None) – Retrieve assets up to this depth below the asset.
- Returns
The requested assets sorted topologically.
- Return type
- time_series(**kwargs: Any) TimeSeriesList
Retrieve all time series related to this asset.
- Parameters
**kwargs (Any) – All extra keyword arguments are passed to time_series/list. NB: ‘asset_ids’ can’t be used.
- Returns
All time series related to this asset.
- Return type
- to_pandas(expand: Sequence[str] = ('metadata', 'aggregates'), ignore: list[str] | None = None, camel_case: bool = False) pandas.DataFrame
Convert the instance into a pandas DataFrame.
- Parameters
expand (Sequence[str]) – List of row keys to expand, only works if the value is a Dict.
ignore (list[str] | None) – List of row keys to not include when converting to a data frame.
camel_case (bool) – Convert column names to camel case (e.g. externalId instead of external_id)
- Returns
The dataframe.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.assets.AssetAggregate(count: int | None = None, **kwargs: Any)
Bases:
dict
Aggregation group of assets
- Parameters
count (int | None) – Size of the aggregation group
**kwargs (Any) – No description.
- class cognite.client.data_classes.assets.AssetFilter(name: str | None = None, parent_ids: Sequence[int] | None = None, parent_external_ids: Sequence[str] | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, metadata: dict[str, str] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, root: bool | None = None, external_id_prefix: str | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None)
Bases:
CogniteFilter
Filter on assets with strict matching.
- Parameters
name (str | None) – The name of the asset.
parent_ids (Sequence[int] | None) – Return only the direct descendants of the specified assets.
parent_external_ids (Sequence[str] | None) – Return only the direct descendants of the specified assets.
asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include assets in subtrees rooted at the specified assets (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.
data_set_ids (Sequence[dict[str, Any]] | None) – No description.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.
source (str | None) – The source of the asset.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
root (bool | None) – Whether the filtered assets are root assets, or not. Set to True to only list root assets.
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
labels (LabelFilter | None) – Return only the resource matching the specified label constraints.
geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.
- dump(camel_case: bool = False) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.assets.AssetHierarchy(assets: Sequence[Asset], ignore_orphans: bool = False)
Bases:
object
Class that verifies if a collection of Assets is valid, by validating its internal consistency. This is done “offline”, meaning CDF is -not- queried for the already existing assets. As a result, any asset providing a parent link by ID instead of external ID, are assumed valid.
- Parameters
assets (Sequence[Asset]) – Sequence of assets to be inspected for validity.
ignore_orphans (bool) – If true, orphan assets are assumed valid and won’t raise.
Examples
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import AssetHierarchy >>> client = CogniteClient() >>> hierarchy = AssetHierarchy(assets) >>> # Get a report written to the terminal listing any issues: >>> hierarchy.validate_and_report() >>> if hierarchy.is_valid(): ... res = client.assets.create_hierarchy(hierarchy) ... # If there are issues, you may inspect them directly: ... else: ... hierarchy.orphans ... hierarchy.invalid ... hierarchy.unsure_parents ... hierarchy.duplicates ... hierarchy.cycles # Requires no other basic issues
There are other ways to generate the report than to write directly to screen. You may pass an
output_file
which can be either aPath
object (writes are done in append-mode) or a file-like object supportingwrite
(default isNone
which is just regularprint
):>>> # Get a report written to file: >>> from pathlib import Path >>> report = Path("path/to/my_report.txt") >>> hierarchy = AssetHierarchy(assets) >>> hierarchy.validate_and_report(output_file=report)
>>> # Get a report as text "in memory": >>> import io >>> with io.StringIO() as file_like: ... hierarchy.validate_and_report(output_file=file_like) ... report = file_like.getvalue()
- count_subtree(mapping: dict[str | None, list[Asset]]) dict[str, int]
Returns a mapping from asset external ID to the size of its subtree (children and children of children etc.).
- Parameters
mapping (dict[str | None, list[Asset]]) – The mapping returned by groupby_parent_xid(). If None is passed, will be recreated (slightly expensive).
- Returns
Lookup from external ID to descendant count.
- Return type
dict[str, int]
- groupby_parent_xid() dict[str | None, list[Asset]]
Returns a mapping from parent external ID to a list of its direct children.
Note
If the AssetHierarchy was initialized with ignore_orphans=True, all orphans assets, if any, are returned as part of the root assets in the mapping and can be accessed by mapping[None]. The same is true for all assets linking its parent by ID.
- Returns
No description.
- Return type
dict[str | None, list[Asset]]
- class cognite.client.data_classes.assets.AssetList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[Asset
],IdTransformerMixin
- events() EventList
Retrieve all events related to these assets.
- Returns
All events related to the assets in this AssetList.
- Return type
- files() FileMetadataList
Retrieve all files metadata related to these assets.
- Returns
Metadata about all files related to the assets in this AssetList.
- Return type
- sequences() SequenceList
Retrieve all sequences related to these assets.
- Returns
All sequences related to the assets in this AssetList.
- Return type
- time_series() TimeSeriesList
Retrieve all time series related to these assets.
- Returns
All time series related to the assets in this AssetList.
- Return type
- class cognite.client.data_classes.assets.AssetProperty(value)
Bases:
EnumProperty
An enumeration.
- class cognite.client.data_classes.assets.AssetUpdate(id: int | None = None, external_id: str | None = None)
Bases:
CogniteUpdate
Changes applied to asset
- Parameters
id (int) – A server-generated ID for the object.
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
- class cognite.client.data_classes.assets.SortableAssetProperty(value)
Bases:
EnumProperty
An enumeration.
Events
Retrieve an event by id
- EventsAPI.retrieve(id: int | None = None, external_id: str | None = None) Event | None
Retrieve a single event by id.
- Parameters
id (int | None) – ID
external_id (str | None) – External ID
- Returns
Requested event or None if it does not exist.
- Return type
Event | None
Examples
Get event by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.events.retrieve(id=1)
Get event by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.events.retrieve(external_id="1")
Retrieve multiple events by id
- EventsAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) EventList
Retrieve multiple events by id.
- Parameters
ids (Sequence[int] | None) – IDs
external_ids (Sequence[str] | None) – External IDs
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
The requested events.
- Return type
Examples
Get events by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.events.retrieve_multiple(ids=[1, 2, 3])
Get events by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.events.retrieve_multiple(external_ids=["abc", "def"])
List events
- EventsAPI.list(start_time: dict[str, Any] | TimestampRange | None = None, end_time: dict[str, Any] | EndTimeFilter | None = None, active_at_time: dict[str, Any] | TimestampRange | None = None, type: str | None = None, subtype: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, external_id_prefix: str | None = None, sort: Sequence[str] | None = None, partitions: int | None = None, limit: int | None = 25) EventList
-
- Parameters
start_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
end_time (dict[str, Any] | EndTimeFilter | None) – Range between two timestamps.
active_at_time (dict[str, Any] | TimestampRange | None) – Event is considered active from its startTime to endTime inclusive. If startTime is null, event is never active. If endTime is null, event is active from startTime onwards. activeAtTime filter will match all events that are active at some point from min to max, from min, or to max, depending on which of min and max parameters are specified.
type (str | None) – Type of the event, e.g ‘failure’.
subtype (str | None) – Subtype of the event, e.g ‘electrical’.
metadata (dict[str, str] | None) – Customizable extra data about the event. String key -> String value.
asset_ids (Sequence[int] | None) – Asset IDs of related equipments that this event relates to.
asset_external_ids (Sequence[str] | None) – Asset External IDs of related equipment that this event relates to.
asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.
asset_subtree_external_ids (str | Sequence[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.
data_set_ids (int | Sequence[int] | None) – Return only events in the specified data set(s) with this id / these ids.
data_set_external_ids (str | Sequence[str] | None) – Return only events in the specified data set(s) with this external id / these external ids.
source (str | None) – The source of this event.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
external_id_prefix (str | None) – External Id provided by client. Should be unique within the project.
sort (Sequence[str] | None) – Sort by array of selected fields. Ex: [“startTime:desc’]. Default sort order is asc when omitted. Filter accepts following field names: startTime, endTime, createdTime, lastUpdatedTime. We only support 1 field for now.
partitions (int | None) – Retrieve events in parallel using this number of workers. Also requires limit=None to be passed. To prevent unexpected problems and maximize read throughput, API documentation recommends at most use 10 partitions. When using more than 10 partitions, actual throughout decreases. In future releases of the APIs, CDF may reject requests with more than 10 partitions.
limit (int | None) – Maximum number of events to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of requested events
- Return type
Examples
List events and filter on max start time:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> event_list = c.events.list(limit=5, start_time={"max": 1500000000})
Iterate over events:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for event in c.events: ... event # do something with the event
Iterate over chunks of events to reduce memory load:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for event_list in c.events(chunk_size=2500): ... event_list # do something with the events
Aggregate events
- EventsAPI.aggregate(filter: EventFilter | dict | None = None) list[AggregateResult]
-
- Parameters
filter (EventFilter | dict | None) – Filter on events filter with exact match
- Returns
List of event aggregates
- Return type
list[AggregateResult]
Examples
Aggregate events:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> aggregate_type = c.events.aggregate(filter={"type": "failure"})
Aggregate Event Count
- EventsAPI.aggregate_count(property: EventPropertyLike | None = None, advanced_filter: Filter | dict | None = None, filter: EventFilter | dict | None = None) int
Count of event matching the specified filters.
- Parameters
property (EventPropertyLike | None) – If specified, Get an approximate number of Events with a specific property (property is not null) and matching the filters.
advanced_filter (Filter | dict | None) – The filter to narrow down the events to count.
filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.
- Returns
The number of events matching the specified filters and search.
- Return type
int
Examples
Count the number of events in your CDF project:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> count = c.events.aggregate_count()
Count the number of workorder events in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.events import EventProperty >>> c = CogniteClient() >>> is_workorder = filters.Equals(EventProperty.type, "workorder") >>> workorder_count = c.events.aggregate_count(advanced_filter=is_workorder)
Aggregate Event Values Cardinality
- EventsAPI.aggregate_cardinality_values(property: EventPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: EventFilter | dict | None = None) int
Find approximate property count for events.
- Parameters
property (EventPropertyLike) – The property to count the cardinality of.
advanced_filter (Filter | dict | None) – The filter to narrow down the events to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.
- Returns
The number of properties matching the specified filter.
- Return type
int
Examples:
Count the number of types of events in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.events import EventProperty >>> c = CogniteClient() >>> type_count = c.events.aggregate_cardinality_values(EventProperty.type)
Count the number of types of events linked to asset 123 in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.events import EventProperty >>> c = CogniteClient() >>> is_asset = filters.ContainsAny(EventProperty.asset_ids, 123) >>> plain_text_author_count = c.events.aggregate_cardinality_values(EventProperty.type, advanced_filter=is_asset)
Aggregate Event Property Cardinality
- EventsAPI.aggregate_cardinality_properties(path: EventPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: EventFilter | dict | None = None) int
Find approximate paths count for events.
- Parameters
path (EventPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The filter to narrow down the events to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.
- Returns
The number of properties matching the specified filters and search.
- Return type
int
Examples
Count the number of metadata keys for events in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.events import EventProperty >>> c = CogniteClient() >>> type_count = c.events.aggregate_cardinality_properties(EventProperty.metadata)
Aggregate Event Unique Values
- EventsAPI.aggregate_unique_values(fields: Sequence[str], filter: EventFilter | dict | None = None, property: EventPropertyLike | None = None, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None) list[AggregateUniqueValuesResult]
- EventsAPI.aggregate_unique_values(fields: Literal[None] = None, filter: EventFilter | dict | None = None, property: EventPropertyLike | None = None, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None) UniqueResultList
Get unique properties with counts for events.
- Parameters
fields (Sequence[str] | None) – The fields to return. Defaults to [“count”].
filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.
property (EventPropertyLike | None) – The property name(s) to apply the aggregation on.
advanced_filter (Filter | dict | None) – The filter to narrow down the events to consider.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
- Returns
List of unique values of events matching the specified filters and search.
- Return type
list[AggregateUniqueValuesResult] | UniqueResultList
Examples:
Get the unique types with count of events in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.events import EventProperty >>> c = CogniteClient() >>> result = c.events.aggregate_unique_values(EventProperty.type) >>> print(result.unique)
Get the unique types of events after 2020-01-01 in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.events import EventProperty >>> from cognite.client.utils import timestamp_to_ms >>> from datetime import datetime >>> c = CogniteClient() >>> is_after_2020 = filters.Range(EventProperty.start_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.events.aggregate_unique_values(EventProperty.type, advanced_filter=is_after_2020) >>> print(result.unique)
Get the unique types of events after 2020-01-01 in your CDF project, but exclude all types that start with “planned”:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.events import EventProperty >>> from cognite.client.data_classes import aggregations >>> c = CogniteClient() >>> agg = aggregations >>> not_planned = agg.Not(agg.Prefix("planned")) >>> is_after_2020 = filters.Range(EventProperty.start_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.events.aggregate_unique_values(EventProperty.type, advanced_filter=is_after_2020, aggregate_filter=not_planned) >>> print(result.unique)
Aggregate Event Unique Properties
- EventsAPI.aggregate_unique_properties(path: EventPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: EventFilter | dict | None = None) UniqueResultList
Get unique paths with counts for events.
- Parameters
path (EventPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The filter to narrow down the events to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.
- Returns
List of unique values of events matching the specified filters and search.
- Return type
UniqueResultList
Examples
Get the unique metadata keys with count of events in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.events import EventProperty >>> c = CogniteClient() >>> result = c.events.aggregate_unique_properties(EventProperty.metadata) >>> print(result.unique)
Search for events
- EventsAPI.search(description: str | None = None, filter: EventFilter | dict | None = None, limit: int = 25) EventList
Search for events Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.
- Parameters
description (str | None) – Fuzzy match on description.
filter (EventFilter | dict | None) – Filter to apply. Performs exact match on these fields.
limit (int) – Maximum number of results to return.
- Returns
List of requested events
- Return type
Examples
Search for events:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.events.search(description="some description")
Create events
- EventsAPI.create(event: Sequence[Event]) EventList
- EventsAPI.create(event: Event) Event
-
- Parameters
event (Event | Sequence[Event]) – Event or list of events to create.
- Returns
Created event(s)
- Return type
Examples
Create new events:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Event >>> c = CogniteClient() >>> events = [Event(start_time=0, end_time=1), Event(start_time=2, end_time=3)] >>> res = c.events.create(events)
Delete events
- EventsAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, ignore_unknown_ids: bool = False) None
-
- Parameters
id (int | Sequence[int] | None) – Id or list of ids
external_id (str | Sequence[str] | None) – External ID or list of external ids
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
Examples
Delete events by id or external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.events.delete(id=[1,2,3], external_id="3")
Update events
- EventsAPI.update(item: Sequence[Event | EventUpdate]) EventList
- EventsAPI.update(item: Event | EventUpdate) Event
-
- Parameters
item (Event | EventUpdate | Sequence[Event | EventUpdate]) – Event(s) to update
- Returns
Updated event(s)
- Return type
Examples
Update an event that you have fetched. This will perform a full update of the event:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> event = c.events.retrieve(id=1) >>> event.description = "New description" >>> res = c.events.update(event)
Perform a partial update on a event, updating the description and adding a new field to metadata:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import EventUpdate >>> c = CogniteClient() >>> my_update = EventUpdate(id=1).description.set("New description").metadata.add({"key": "value"}) >>> res = c.events.update(my_update)
Upsert events
- EventsAPI.upsert(item: Sequence[Event], mode: Literal['patch', 'replace'] = 'patch') EventList
- EventsAPI.upsert(item: Event, mode: Literal['patch', 'replace'] = 'patch') Event
- Upsert events, i.e., update if it exists, and create if it does not exist.
Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.
For more details, see Upsert.
- Parameters
item (Event | Sequence[Event]) – Event or list of events to upsert.
mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the events are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified.
- Returns
The upserted event(s).
- Return type
Examples
Upsert for events:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Event >>> c = CogniteClient() >>> existing_event = c.events.retrieve(id=1) >>> existing_event.description = "New description" >>> new_event = Event(external_id="new_event", description="New event") >>> res = c.events.upsert([existing_event, new_event], mode="replace")
Filter events
- EventsAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, limit: int | None = 25) EventList
-
Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.
- Parameters
filter (Filter | dict) – Filter to apply.
sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.
limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of events that match the filter criteria.
- Return type
Examples
Find all events that has external id with prefix “workorder” and the word ‘failure’ in the description, and sort by start time descending:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> c = CogniteClient() >>> f = filters >>> is_workorder = f.Prefix("external_id", "workorder") >>> has_failure = f.Search("description", "failure") >>> res = c.events.filter(filter=f.And(is_workorder, has_failure), ... sort=("start_time", "desc"))
Note that you can check the API documentation above to see which properties you can filter on with which filters.
To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the EventProperty and SortableEventProperty enums.
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.events import EventProperty, SortableEventProperty >>> c = CogniteClient() >>> f = filters >>> is_workorder = f.Prefix(EventProperty.external_id, "workorder") >>> has_failure = f.Search(EventProperty.description, "failure") >>> res = c.events.filter(filter=f.And(is_workorder, has_failure), ... sort=(SortableEventProperty.start_time, "desc"))
Events Data classes
- class cognite.client.data_classes.events.EndTimeFilter(max: int | None = None, min: int | None = None, is_null: bool | None = None, **kwargs: Any)
Bases:
dict
Either range between two timestamps or isNull filter condition.
- Parameters
max (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
min (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
is_null (bool | None) – Set to true if you want to search for data with field value not set, false to search for cases where some value is present.
**kwargs (Any) – No description.
- class cognite.client.data_classes.events.Event(external_id: str | None = None, data_set_id: int | None = None, start_time: int | None = None, end_time: int | None = None, type: str | None = None, subtype: str | None = None, description: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, source: str | None = None, id: int | None = None, last_updated_time: int | None = None, created_time: int | None = None, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
An event represents something that happened at a given interval in time, e.g a failure, a work order etc.
- Parameters
external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.
data_set_id (int | None) – The id of the dataset this event belongs to.
start_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
end_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
type (str | None) – Type of the event, e.g ‘failure’.
subtype (str | None) – SubType of the event, e.g ‘electrical’.
description (str | None) – Textual description of the event.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 128000 bytes, up to 256 key-value pairs, of total size at most 200000.
asset_ids (Sequence[int] | None) – Asset IDs of equipment that this event relates to.
source (str | None) – The source of this event.
id (int | None) – A server-generated ID for the object.
last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
cognite_client (CogniteClient | None) – The client to associate with this object.
- class cognite.client.data_classes.events.EventFilter(start_time: dict[str, Any] | TimestampRange | None = None, end_time: dict[str, Any] | EndTimeFilter | None = None, active_at_time: dict[str, Any] | TimestampRange | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, source: str | None = None, type: str | None = None, subtype: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, external_id_prefix: str | None = None)
Bases:
CogniteFilter
Filter on events filter with exact match
- Parameters
start_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
end_time (dict[str, Any] | EndTimeFilter | None) – Either range between two timestamps or isNull filter condition.
active_at_time (dict[str, Any] | TimestampRange | None) – Event is considered active from its startTime to endTime inclusive. If startTime is null, event is never active. If endTime is null, event is active from startTime onwards. activeAtTime filter will match all events that are active at some point from min to max, from min, or to max, depending on which of min and max parameters are specified.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 128000 bytes, up to 256 key-value pairs, of total size at most 200000.
asset_ids (Sequence[int] | None) – Asset IDs of equipment that this event relates to.
asset_external_ids (Sequence[str] | None) – Asset External IDs of equipment that this event relates to.
asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include events that have a related asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.
data_set_ids (Sequence[dict[str, Any]] | None) – Only include events that belong to these datasets.
source (str | None) – The source of this event.
type (str | None) – Type of the event, e.g ‘failure’.
subtype (str | None) – SubType of the event, e.g ‘electrical’.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
- class cognite.client.data_classes.events.EventList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[Event
],IdTransformerMixin
- class cognite.client.data_classes.events.EventProperty(value)
Bases:
EnumProperty
An enumeration.
- class cognite.client.data_classes.events.EventUpdate(id: int | None = None, external_id: str | None = None)
Bases:
CogniteUpdate
Changes will be applied to event.
- Parameters
id (int) – A server-generated ID for the object.
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
- class cognite.client.data_classes.events.SortableEventProperty(value)
Bases:
EnumProperty
An enumeration.
Data points
Warning
- TimeSeries unit support is a new feature:
The API specification is in beta.
The SDK implementation is in alpha.
Unit conversion is implemented in the Datapoints APIs with the parameters target_unit and target_unit_system in the retrieve methods below. It is only the use of these arguments that is in alpha. Using the methods below without these arguments is stable.
Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.
Retrieve datapoints
- DatapointsAPI.retrieve(*, id: None | int | dict[str, Any] | Sequence[int | dict[str, Any]] = None, external_id: None | str | dict[str, Any] | Sequence[str | dict[str, Any]] = None, start: int | str | datetime | None = None, end: int | str | datetime | None = None, aggregates: Aggregate | str | list[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, limit: int | None = None, include_outside_points: bool = False, ignore_unknown_ids: bool = False) Datapoints | DatapointsList | None
Retrieve datapoints for one or more time series.
- Performance guide:
In order to retrieve millions of datapoints as efficiently as possible, here are a few guidelines:
For best speed, and significantly lower memory usage, consider using
retrieve_arrays(...)
which usesnumpy.ndarrays
for data storage.Only unlimited queries with (
limit=None
) are fetched in parallel so specifying a large finitelimit
like 1 million, comes with severe performance penalty as data is fetched serially.Try to avoid specifying start and end to be very far from the actual data: If you have data from 2000 to 2015, don’t set start=0 (1970).
- Parameters
id (None | int | dict[str, Any] | Sequence[int | dict[str, Any]]) – Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | dict[str, Any] | Sequence[str | dict[str, Any]]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.
start (int | str | datetime | None) – Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime | None) – Exclusive end. Default: “now”
aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)
granularity (str | None) – The granularity to fetch aggregates at. e.g. ’15s’, ‘2h’, ‘10d’. Default: None.
target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.
limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)
include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False
ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False
- Returns
A
Datapoints
object containing the requested data, or aDatapointsList
if multiple time series were asked for (the ordering is ids first, then external_ids). If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.- Return type
Datapoints | DatapointsList | None
Examples
You can specify the identifiers of the datapoints you wish to retrieve in a number of ways. In this example we are using the time-ago format to get raw data for the time series with id=42 from 2 weeks ago up until now:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> dps = client.time_series.data.retrieve(id=42, start="2w-ago")
You can also get aggregated values, such as max or average. You may also fetch more than one time series simultaneously. Here we are getting daily averages and maximum values for all of 2018, for two different time series, where we’re specifying start and end as integers (milliseconds after epoch). Note that we are fetching them using their external ids:
>>> dps_lst = client.time_series.data.retrieve( ... external_id=["foo", "bar"], ... start=1514764800000, ... end=1546300800000, ... aggregates=["max", "average"], ... granularity="1d")
In the two code examples above, we have a dps object (an instance of
Datapoints
), and a dps_lst object (an instance ofDatapointsList
). On dps, which in this case contains raw datapoints, you may access the underlying data directly by using the .value attribute. This works for both numeric and string (raw) datapoints, but not aggregates - they must be accessed by their respective names, because you’re allowed to fetch up to 10 aggregates simultaneously, and they are stored on the same object:>>> raw_data = dps.value >>> first_dps = dps_lst[0] # optionally: `dps_lst.get(external_id="foo")` >>> avg_data = first_dps.average >>> max_data = first_dps.max
You may also slice a
Datapoints
object (you getDatapoints
back), or ask for “a row of data” at a single index in same way you would do with a built-in list (you get a Datapoint object back, note the singular name). You’ll also get Datapoint objects when iterating through aDatapoints
object, but this should generally be avoided (consider this a performance warning):>>> dps_slice = dps[-10:] # Last ten values >>> dp = dps[3] # The third value >>> for dp in dps_slice: ... pass # do something!
All parameters can be individually set if you pass (one or more) dictionaries (even ignore_unknown_ids, contrary to the API). If you also pass top-level parameters, these will be overruled by the individual parameters (where both exist). You are free to mix any kind of ids and external ids: Single identifiers, single dictionaries and (mixed) lists of these.
Let’s say you want different aggregates and end-times for a few time series (when only fetching a single aggregate, you may pass the string directly for convenience):
>>> dps_lst = client.time_series.data.retrieve( ... id=[ ... {"id": 42, "end": "1d-ago", "aggregates": "average"}, ... {"id": 69, "end": "2d-ago", "aggregates": ["average"]}, ... {"id": 96, "end": "3d-ago", "aggregates": ["min", "max", "count"]}, ... ], ... external_id={"external_id": "foo", "aggregates": "max"}, ... start="5d-ago", ... granularity="1h")
When requesting multiple time series, an easy way to get the datapoints of a specific one is to use the .get method on the returned
DatapointsList
object, then specify if you want id or external_id. Note: If you fetch a time series by using id, you can still access it with its external_id (and the opposite way around), if you know it:>>> from datetime import datetime, timezone >>> utc = timezone.utc >>> dps_lst = client.time_series.data.retrieve( ... start=datetime(1907, 10, 14, tzinfo=utc), ... end=datetime(1907, 11, 6, tzinfo=utc), ... id=[42, 43, 44, ..., 499, 500], ... ) >>> ts_350 = dps_lst.get(id=350) # ``Datapoints`` object
…but what happens if you request some duplicate ids or external_ids? In this example we will show how to get data from multiple disconnected periods. Let’s say you’re tasked to train a machine learning model to recognize a specific failure mode of a system, and you want the training data to only be from certain periods (when an alarm was on/high). Assuming these alarms are stored as events in CDF, with both start- and end times, we can use these directly in the query.
After fetching, the .get method will return a list of
Datapoints
instead, (assuming we have more than one event) in the same order, similar to how slicing works with non-unique indices on Pandas DataFrames:>>> periods = client.events.list(type="alarm", subtype="pressure") >>> sensor_xid = "foo-pressure-bar" >>> dps_lst = client.time_series.data.retrieve( ... id=[42, 43, 44], ... external_id=[ ... {"external_id": sensor_xid, "start": ev.start_time, "end": ev.end_time} ... for ev in periods ... ]) >>> ts_44 = dps_lst.get(id=44) # Single ``Datapoints`` object >>> ts_lst = dps_lst.get(external_id=sensor_xid) # List of ``len(periods)`` ``Datapoints`` objects
The API has an endpoint to “retrieve latest (before)”, but not “after”. Luckily, we can emulate that behaviour easily. Let’s say we have a very dense time series and do not want to fetch all of the available raw data (or fetch less precise aggregate data), just to get the very first datapoint of every month (from e.g. the year 2000 through 2010):
>>> import itertools >>> month_starts = [ ... datetime(year, month, 1, tzinfo=utc) ... for year, month in itertools.product(range(2000, 2011), range(1, 13))] >>> dps_lst = client.time_series.data.retrieve( ... external_id=[{"external_id": "foo", "start": start} for start in month_starts], ... limit=1)
To get all historic and future datapoints for a time series, e.g. to do a backup, you may want to import the two integer constants: MIN_TIMESTAMP_MS and MAX_TIMESTAMP_MS, to make sure you do not miss any. Performance warning: This pattern of fetching datapoints from the entire valid time domain is slower and shouldn’t be used for regular “day-to-day” queries:
>>> from cognite.client.utils import MIN_TIMESTAMP_MS, MAX_TIMESTAMP_MS >>> dps_backup = client.time_series.data.retrieve( ... id=123, ... start=MIN_TIMESTAMP_MS, ... end=MAX_TIMESTAMP_MS + 1) # end is exclusive
Another example here is just to showcase the great flexibility of the retrieve endpoint, with a very custom query:
>>> ts1 = 1337 >>> ts2 = { ... "id": 42, ... "start": -12345, # Overrides `start` arg. below ... "end": "1h-ago", ... "limit": 1000, # Overrides `limit` arg. below ... "include_outside_points": True, ... } >>> ts3 = { ... "id": 11, ... "end": "1h-ago", ... "aggregates": ["max"], ... "granularity": "42h", ... "include_outside_points": False, ... "ignore_unknown_ids": True, # Overrides `ignore_unknown_ids` arg. below ... } >>> dps_lst = client.time_series.data.retrieve( ... id=[ts1, ts2, ts3], start="2w-ago", limit=None, ignore_unknown_ids=False)
If we have created a timeseries set with ‘unit_external_id’ we can use the ‘target_unit’ parameter to convert the datapoints to the desired unit. In the example below, we assume that the timeseries is set with unit_external_id ‘temperature:deg_c’ and id=’42’.
>>> client.time_series.data.retrieve( ... id=42, start="2w-ago", limit=None, target_unit="temperature:deg_f")
Or alternatively, we can use the ‘target_unit_system’ parameter to convert the datapoints to the desired unit system.
>>> client.time_series.data.retrieve( ... id=42, start="2w-ago", limit=None, target_unit_system="Imperial")
Retrieve datapoints as numpy arrays
- DatapointsAPI.retrieve_arrays(*, id: None | int | dict[str, Any] | Sequence[int | dict[str, Any]] = None, external_id: None | str | dict[str, Any] | Sequence[str | dict[str, Any]] = None, start: int | str | datetime | None = None, end: int | str | datetime | None = None, aggregates: Aggregate | str | list[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, limit: int | None = None, include_outside_points: bool = False, ignore_unknown_ids: bool = False) DatapointsArray | DatapointsArrayList | None
Retrieve datapoints for one or more time series.
Note: This method requires
numpy
to be installed.- Parameters
id (None | int | dict[str, Any] | Sequence[int | dict[str, Any]]) – Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | dict[str, Any] | Sequence[str | dict[str, Any]]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.
start (int | str | datetime | None) – Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime | None) – Exclusive end. Default: “now”
aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)
granularity (str | None) – The granularity to fetch aggregates at. e.g. ’15s’, ‘2h’, ‘10d’. Default: None.
target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.
limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)
include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False
ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False
- Returns
A
DatapointsArray
object containing the requested data, or aDatapointsArrayList
if multiple time series were asked for (the ordering is ids first, then external_ids). If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.- Return type
DatapointsArray | DatapointsArrayList | None
Examples
Note: For many more usage examples, check out the
retrieve()
method which accepts exactly the same arguments.Get weekly
min
andmax
aggregates for a time series with id=42 since the year 2000, then compute the range of values:>>> from cognite.client import CogniteClient >>> from datetime import datetime, timezone >>> client = CogniteClient() >>> dps = client.time_series.data.retrieve_arrays( ... id=42, ... start=datetime(2020, 1, 1, tzinfo=timezone.utc), ... aggregates=["min", "max"], ... granularity="7d") >>> weekly_range = dps.max - dps.min
Get up-to 2 million raw datapoints for the last 48 hours for a noisy time series with external_id=”ts-noisy”, then use a small and wide moving average filter to smooth it out:
>>> import numpy as np >>> dps = client.time_series.data.retrieve_arrays( ... external_id="ts-noisy", ... start="2d-ago", ... limit=2_000_000) >>> smooth = np.convolve(dps.value, np.ones(5) / 5) >>> smoother = np.convolve(dps.value, np.ones(20) / 20)
Get raw datapoints for multiple time series, that may or may not exist, from the last 2 hours, then find the largest gap between two consecutive values for all time series, also taking the previous value into account (outside point).
>>> id_lst = [42, 43, 44] >>> dps_lst = client.time_series.data.retrieve_arrays( ... id=id_lst, ... start="2h-ago", ... include_outside_points=True, ... ignore_unknown_ids=True) >>> largest_gaps = [np.max(np.diff(dps.timestamp)) for dps in dps_lst]
Get raw datapoints for a time series with external_id=”bar” from the last 10 weeks, then convert to a
pandas.Series
(you can of course also use theto_pandas()
convenience method if you want apandas.DataFrame
):>>> import pandas as pd >>> dps = client.time_series.data.retrieve_arrays(external_id="bar", start="10w-ago") >>> series = pd.Series(dps.value, index=dps.timestamp)
Retrieve datapoints in pandas dataframe
- DatapointsAPI.retrieve_dataframe(*, id: None | int | dict[str, Any] | Sequence[int | dict[str, Any]] = None, external_id: None | str | dict[str, Any] | Sequence[str | dict[str, Any]] = None, start: int | str | datetime | None = None, end: int | str | datetime | None = None, aggregates: Aggregate | str | list[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, limit: int | None = None, include_outside_points: bool = False, ignore_unknown_ids: bool = False, uniform_index: bool = False, include_aggregate_name: bool = True, include_granularity_name: bool = False, column_names: Literal['id', 'external_id'] = 'external_id') pd.DataFrame
Get datapoints directly in a pandas dataframe.
Note: If you have duplicated time series in your query, the dataframe columns will also contain duplicates.
- Parameters
id (None | int | dict[str, Any] | Sequence[int | dict[str, Any]]) – Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | dict[str, Any] | Sequence[str | dict[str, Any]]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.
start (int | str | datetime | None) – Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime | None) – Exclusive end. Default: “now”
aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)
granularity (str | None) – The granularity to fetch aggregates at. e.g. ’15s’, ‘2h’, ‘10d’. Default: None.
target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.
limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)
include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False
ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False
uniform_index (bool) – If only querying aggregates AND a single granularity is used AND no limit is used, specifying uniform_index=True will return a dataframe with an equidistant datetime index from the earliest start to the latest end (missing values will be NaNs). If these requirements are not met, a ValueError is raised. Default: False
include_aggregate_name (bool) – Include ‘aggregate’ in the column name, e.g. my-ts|average. Ignored for raw time series. Default: True
include_granularity_name (bool) – Include ‘granularity’ in the column name, e.g. my-ts|12h. Added after ‘aggregate’ when present. Ignored for raw time series. Default: False
column_names (Literal["id", "external_id"]) – Use either ids or external ids as column names. Time series missing external id will use id as backup. Default: “external_id”
- Returns
A pandas DataFrame containing the requested time series. The ordering of columns is ids first, then external_ids. For time series with multiple aggregates, they will be sorted in alphabetical order (“average” before “max”).
- Return type
pd.DataFrame
Examples
Get a pandas dataframe using a single id, and use this id as column name, with no more than 100 datapoints:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> df = client.time_series.data.retrieve_dataframe( ... id=12345, ... start="2w-ago", ... end="now", ... limit=100, ... column_names="id")
Get the pandas dataframe with a uniform index (fixed spacing between points) of 1 day, for two time series with individually specified aggregates, from 1990 through 2020:
>>> from datetime import datetime, timezone >>> df = client.time_series.data.retrieve_dataframe( ... id=[ ... {"external_id": "foo", "aggregates": ["discrete_variance"]}, ... {"external_id": "bar", "aggregates": ["total_variation", "continuous_variance"]}, ... ], ... granularity="1d", ... start=datetime(1990, 1, 1, tzinfo=timezone.utc), ... end=datetime(2020, 12, 31, tzinfo=timezone.utc), ... uniform_index=True)
Get a pandas dataframe containing the ‘average’ aggregate for two time series using a 30-day granularity, starting Jan 1, 1970 all the way up to present, without having the aggregate name in the column names:
>>> df = client.time_series.data.retrieve_dataframe( ... external_id=["foo", "bar"], ... aggregates=["average"], ... granularity="30d", ... include_aggregate_name=False)
Remember that pandas.Timestamp is a subclass of datetime, so you can use Timestamps as start and end arguments:
>>> import pandas as pd >>> df = client.time_series.data.retrieve_dataframe( ... external_id="foo", ... start=pd.Timestamp("2023-01-01"), ... end=pd.Timestamp("2023-02-01"), ... )
Retrieve datapoints in time zone in pandas dataframe
- DatapointsAPI.retrieve_dataframe_in_tz(*, id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, start: datetime, end: datetime, aggregates: Aggregate | str | Sequence[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, ignore_unknown_ids: bool = False, uniform_index: bool = False, include_aggregate_name: bool = True, include_granularity_name: bool = False, column_names: Literal['id', 'external_id'] = 'external_id') pd.DataFrame
Get datapoints directly in a pandas dataframe in the same time zone as start and end.
Note
This is a convenience method. It builds on top of the methods
retrieve_arrays
andretrieve_dataframe
. It enables you to get correct aggregates in your local time zone with daily, weekly, monthly, quarterly, and yearly aggregates with automatic handling for daylight saving time (DST) transitions. If your time zone observes DST, and your query crosses at least one DST-boundary, granularities like “3 days” or “1 week”, that used to represent fixed durations, no longer do so. To understand why, let’s illustrate with an example: A typical time zone (above the equator) that observes DST will skip one hour ahead during spring, leading to a day that is only 23 hours long, and oppositely in the fall, turning back the clock one hour, yielding a 25-hour-long day.- In short, this method works as follows:
Get the time zone from start and end (must be equal).
Split the time range from start to end into intervals based on DST boundaries.
Create a query for each interval and pass all to the retrieve_arrays method.
Stack the resulting arrays into a single column in the resulting DataFrame.
Warning
The queries to
retrieve_arrays
are translated to a multiple of hours. This means that time zones that are not a whole hour offset from UTC are not supported (yet). The same is true for time zones that observe DST with an offset from standard time that is not a multiple of 1 hour.- Parameters
id (int | Sequence[int] | None) – ID or list of IDs.
external_id (str | Sequence[str] | None) – External ID or list of External IDs.
start (datetime) – Inclusive start, must be time zone aware.
end (datetime) – Exclusive end, must be time zone aware and have the same time zone as start.
aggregates (Aggregate | str | Sequence[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)
granularity (str | None) – The granularity to fetch aggregates at, supported are: second, minute, hour, day, week, month, quarter and year. Default: None.
target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.
ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False
uniform_index (bool) – If querying aggregates, specifying uniform_index=True will return a dataframe with an index with constant spacing between timestamps decided by granularity all the way from start to end (missing values will be NaNs). Default: False
include_aggregate_name (bool) – Include ‘aggregate’ in the column name, e.g. my-ts|average. Ignored for raw time series. Default: True
include_granularity_name (bool) – Include ‘granularity’ in the column name, e.g. my-ts|12h. Added after ‘aggregate’ when present. Ignored for raw time series. Default: False
column_names (Literal["id", "external_id"]) – Use either ids or external ids as column names. Time series missing external id will use id as backup. Default: “external_id”
- Returns
A pandas DataFrame containing the requested time series with a DatetimeIndex localized in the given time zone.
- Return type
pd.DataFrame
Examples
Get a pandas dataframe in the time zone of Oslo, Norway:
>>> from cognite.client import CogniteClient >>> # In Python >=3.9 you may import directly from `zoneinfo` >>> from cognite.client.utils import ZoneInfo >>> client = CogniteClient() >>> df = client.time_series.data.retrieve_dataframe_in_tz( ... id=12345, ... start=datetime(2023, 1, 1, tzinfo=ZoneInfo("Europe/Oslo")), ... end=datetime(2023, 2, 1, tzinfo=ZoneInfo("Europe/Oslo")), ... aggregates="average", ... granularity="1week", ... column_names="id")
Get a pandas dataframe with the sum and continuous variance of the time series with external id “foo” and “bar”, for each quarter from 2020 to 2022 returned in the time zone of Oslo, Norway:
>>> from cognite.client import CogniteClient >>> # In Python >=3.9 you may import directly from `zoneinfo` >>> from cognite.client.utils import ZoneInfo >>> client = CogniteClient() >>> df = client.time_series.data.retrieve_dataframe( ... external_id=["foo", "bar"], ... aggregates=["sum", "continuous_variance"], ... granularity="1quarter", ... start=datetime(2020, 1, 1, tzinfo=ZoneInfo("Europe/Oslo")), ... end=datetime(2022, 12, 31, tzinfo=ZoneInfo("Europe/Oslo")))
Tip
You can also use shorter granularities such as second(s), minute(s), hour(s), which do not require any special handling of DST. The longer granularities at your disposal, which are adjusted for DST, are: day(s), week(s), month(s), quarter(s) and year(s). All the granularities support a one-letter version
s
,m
,h
,d
,w
,q
, andy
, except for month, to avoid confusion with minutes. Furthermore, the granularity is expected to be given as a lowercase.
Retrieve latest datapoint
- DatapointsAPI.retrieve_latest(id: int | LatestDatapointQuery | list[int | LatestDatapointQuery] | None = None, external_id: str | LatestDatapointQuery | list[str | LatestDatapointQuery] | None = None, before: None | int | str | datetime = None, ignore_unknown_ids: bool = False) Datapoints | DatapointsList | None
Get the latest datapoint for one or more time series
- Parameters
id (int | LatestDatapointQuery | list[int | LatestDatapointQuery] | None) – Id or list of ids.
external_id (str | LatestDatapointQuery | list[str | LatestDatapointQuery] | None) – External id or list of external ids.
before (None | int | str | datetime) – (Union[int, str, datetime]): Get latest datapoint before this time. Not used when passing ‘LatestDatapointQuery’.
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
A Datapoints object containing the requested data, or a DatapointsList if multiple were requested. If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.
- Return type
Datapoints | DatapointsList | None
Examples
Getting the latest datapoint in a time series. This method returns a Datapoints object, so the datapoint will be the first element:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.data.retrieve_latest(id=1)[0]
You can also get the first datapoint before a specific time:
>>> res = c.time_series.data.retrieve_latest(id=1, before="2d-ago")[0]
You may also pass an instance of LatestDatapointQuery:
>>> from cognite.client.data_classes import LatestDatapointQuery >>> res = c.time_series.data.retrieve_latest(id=LatestDatapointQuery(id=1, before=60_000))[0]
If you need the latest datapoint for multiple time series, simply give a list of ids. Note that we are using external ids here, but either will work:
>>> res = c.time_series.data.retrieve_latest(external_id=["abc", "def"]) >>> latest_abc = res[0][0] >>> latest_def = res[1][0]
If you need to specify a different value of ‘before’ for each time series, you may pass several LatestDatapointQuery objects:
>>> from datetime import datetime, timezone >>> id_queries = [ ... 123, ... LatestDatapointQuery(id=456, before="1w-ago"), ... LatestDatapointQuery(id=789, before=datetime(2018,1,1, tzinfo=timezone.utc))] >>> res = c.time_series.data.retrieve_latest( ... id=id_queries, ... external_id=LatestDatapointQuery(external_id="abc", before="3h-ago"))
Insert data points
- DatapointsAPI.insert(datapoints: Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime]] | Sequence[tuple[int | float | datetime, int | float | str]], id: int | None = None, external_id: str | None = None) None
Insert datapoints into a time series
Timestamps can be represented as milliseconds since epoch or datetime objects.
- Parameters
datapoints (Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime]] | Sequence[tuple[int | float | datetime, int | float | str]]) – The datapoints you wish to insert. Can either be a list of tuples, a list of dictionaries, a Datapoints object or a DatapointsArray object. See examples below.
id (int | None) – Id of time series to insert datapoints into.
external_id (str | None) – External id of time series to insert datapoint into.
Examples
Your datapoints can be a list of tuples where the first element is the timestamp and the second element is the value:
>>> from cognite.client import CogniteClient >>> from datetime import datetime, timezone >>> c = CogniteClient() >>> # With datetime objects: >>> datapoints = [ ... (datetime(2018,1,1, tzinfo=timezone.utc), 1000), ... (datetime(2018,1,2, tzinfo=timezone.utc), 2000), ... ] >>> c.time_series.data.insert(datapoints, id=1) >>> # With ms since epoch: >>> datapoints = [(150000000000, 1000), (160000000000, 2000)] >>> c.time_series.data.insert(datapoints, id=2)
Or they can be a list of dictionaries:
>>> datapoints = [ ... {"timestamp": 150000000000, "value": 1000}, ... {"timestamp": 160000000000, "value": 2000}, ... ] >>> c.time_series.data.insert(datapoints, external_id="abcd")
Or they can be a Datapoints or DatapointsArray object (with raw datapoints only). Note that the id or external_id set on these objects are not inspected/used (as they belong to the “from-time-series”, and not the “to-time-series”), and so you must explicitly pass the identifier of the time series you want to insert into, which in this example is external_id=”foo”:
>>> data = c.time_series.data.retrieve(external_id="abc", start="1w-ago", end="now") >>> c.time_series.data.insert(data, external_id="foo")
Insert data points into multiple time series
- DatapointsAPI.insert_multiple(datapoints: list[dict[str, str | int | list | Datapoints | DatapointsArray]]) None
Insert datapoints into multiple time series
- Parameters
datapoints (list[dict[str, str | int | list | Datapoints | DatapointsArray]]) – The datapoints you wish to insert along with the ids of the time series. See examples below.
Examples
Your datapoints can be a list of dictionaries, each containing datapoints for a different (presumably) time series. These dictionaries must have the key “datapoints” (containing the data) specified as a
Datapoints
object, aDatapointsArray
object, or list of either tuples (timestamp, value) or dictionaries, {“timestamp”: ts, “value”: value}:>>> from cognite.client import CogniteClient >>> from datetime import datetime, timezone >>> client = CogniteClient() >>> datapoints = [] >>> # With datetime objects and id >>> datapoints.append( ... {"id": 1, "datapoints": [ ... (datetime(2018,1,1,tzinfo=timezone.utc), 1000), ... (datetime(2018,1,2,tzinfo=timezone.utc), 2000), ... ]}) >>> # With ms since epoch and external_id: >>> datapoints.append({"external_id": "foo", "datapoints": [(150000000000, 1000), (160000000000, 2000)]}) >>> # With raw data in a Datapoints object (or DatapointsArray): >>> data_to_clone = client.time_series.data.retrieve(external_id="bar") >>> datapoints.append({"external_id": "bar-clone", "datapoints": data_to_clone}) >>> client.time_series.data.insert_multiple(datapoints)
Insert pandas dataframe
- DatapointsAPI.insert_dataframe(df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) None
Insert a dataframe (columns must be unique).
The index of the dataframe must contain the timestamps (pd.DatetimeIndex). The names of the columns specify the ids or external ids of the time series to which the datapoints will be written.
Said time series must already exist.
- Parameters
df (pd.DataFrame) – Pandas DataFrame object containing the time series.
external_id_headers (bool) – Interpret the column names as external id. Pass False if using ids. Default: True.
dropna (bool) – Set to True to ignore NaNs in the given DataFrame, applied per column. Default: True.
Examples
Post a dataframe with white noise:
>>> import numpy as np >>> import pandas as pd >>> from cognite.client import CogniteClient >>> >>> client = CogniteClient() >>> ts_xid = "my-foo-ts" >>> idx = pd.date_range(start="2018-01-01", periods=100, freq="1d") >>> noise = np.random.normal(0, 1, 100) >>> df = pd.DataFrame({ts_xid: noise}, index=idx) >>> client.time_series.data.insert_dataframe(df)
Delete a range of data points
- DatapointsAPI.delete_range(start: int | str | datetime, end: int | str | datetime, id: int | None = None, external_id: str | None = None) None
Delete a range of datapoints from a time series.
- Parameters
start (int | str | datetime) – Inclusive start of delete range
end (int | str | datetime) – Exclusive end of delete range
id (int | None) – Id of time series to delete data from
external_id (str | None) – External id of time series to delete data from
Examples
Deleting the last week of data from a time series:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.time_series.data.delete_range(start="1w-ago", end="now", id=1)
Delete ranges of data points
- DatapointsAPI.delete_ranges(ranges: list[dict[str, Any]]) None
Delete a range of datapoints from multiple time series.
- Parameters
ranges (list[dict[str, Any]]) – The list of datapoint ids along with time range to delete. See examples below.
Examples
Each element in the list ranges must be specify either id or external_id, and a range:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> ranges = [{"id": 1, "start": "2d-ago", "end": "now"}, ... {"external_id": "abc", "start": "2d-ago", "end": "now"}] >>> c.time_series.data.delete_ranges(ranges)
Data Points Data classes
- class cognite.client.data_classes.datapoints.Datapoint(timestamp: int | None = None, value: str | float | None = None, average: float | None = None, max: float | None = None, min: float | None = None, count: int | None = None, sum: float | None = None, interpolation: float | None = None, step_interpolation: float | None = None, continuous_variance: float | None = None, discrete_variance: float | None = None, total_variation: float | None = None)
Bases:
CogniteResource
An object representing a datapoint.
- Parameters
timestamp (int | None) – The data timestamp in milliseconds since the epoch (Jan 1, 1970). Can be negative to define a date before 1970. Minimum timestamp is 1900.01.01 00:00:00 UTC
value (str | float | None) – The data value. Can be string or numeric
average (float | None) – The integral average value in the aggregate period
max (float | None) – The maximum value in the aggregate period
min (float | None) – The minimum value in the aggregate period
count (int | None) – The number of datapoints in the aggregate period
sum (float | None) – The sum of the datapoints in the aggregate period
interpolation (float | None) – The interpolated value of the series in the beginning of the aggregate
step_interpolation (float | None) – The last value before or at the beginning of the aggregate.
continuous_variance (float | None) – The variance of the interpolated underlying function.
discrete_variance (float | None) – The variance of the datapoint values.
total_variation (float | None) – The total variation of the interpolated underlying function.
- to_pandas(camel_case: bool = False) pandas.DataFrame
Convert the datapoint into a pandas DataFrame.
- Parameters
camel_case (bool) – Convert column names to camel case (e.g. stepInterpolation instead of step_interpolation)
- Returns
pandas.DataFrame
- Return type
pandas.DataFrame
- class cognite.client.data_classes.datapoints.Datapoints(id: int | None = None, external_id: str | None = None, is_string: bool | None = None, is_step: bool | None = None, unit: str | None = None, unit_external_id: str | None = None, granularity: str | None = None, timestamp: Sequence[int] | None = None, value: Sequence[str] | Sequence[float] | None = None, average: list[float] | None = None, max: list[float] | None = None, min: list[float] | None = None, count: list[int] | None = None, sum: list[float] | None = None, interpolation: list[float] | None = None, step_interpolation: list[float] | None = None, continuous_variance: list[float] | None = None, discrete_variance: list[float] | None = None, total_variation: list[float] | None = None, error: list[None | str] | None = None)
Bases:
CogniteResource
An object representing a list of datapoints.
- Parameters
id (int | None) – Id of the timeseries the datapoints belong to
external_id (str | None) – External id of the timeseries the datapoints belong to
is_string (bool | None) – Whether the time series is string valued or not.
is_step (bool | None) – Whether the time series is a step series or not.
unit (str | None) – The physical unit of the time series (free-text field).
unit_external_id (str | None) – The unit_external_id (as defined in the unit catalog) of the returned data points. If the datapoints were converted to a compatible unit, this will equal the converted unit, not the one defined on the time series.
granularity (str | None) – The granularity of the aggregate datapoints (does not apply to raw data)
timestamp (Sequence[int] | None) – The data timestamps in milliseconds since the epoch (Jan 1, 1970). Can be negative to define a date before 1970. Minimum timestamp is 1900.01.01 00:00:00 UTC
value (Sequence[str] | Sequence[float] | None) – The data values. Can be string or numeric
average (list[float] | None) – The integral average values in the aggregate period
max (list[float] | None) – The maximum values in the aggregate period
min (list[float] | None) – The minimum values in the aggregate period
count (list[int] | None) – The number of datapoints in the aggregate periods
sum (list[float] | None) – The sum of the datapoints in the aggregate periods
interpolation (list[float] | None) – The interpolated values of the series in the beginning of the aggregates
step_interpolation (list[float] | None) – The last values before or at the beginning of the aggregates.
continuous_variance (list[float] | None) – The variance of the interpolated underlying function.
discrete_variance (list[float] | None) – The variance of the datapoint values.
total_variation (list[float] | None) – The total variation of the interpolated underlying function.
error (list[None | str] | None) – No description.
- dump(camel_case: bool = False) dict[str, Any]
Dump the datapoints into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representing the instance.
- Return type
dict[str, Any]
- to_pandas(column_names: str = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False, include_errors: bool = False) pandas.DataFrame
Convert the datapoints into a pandas DataFrame.
- Parameters
column_names (str) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.
include_aggregate_name (bool) – Include aggregate in the column name
include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)
include_errors (bool) – For synthetic datapoint queries, include a column with errors.
- Returns
The dataframe.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.datapoints.DatapointsArray(id: int | None = None, external_id: str | None = None, is_string: bool | None = None, is_step: bool | None = None, unit: str | None = None, unit_external_id: str | None = None, granularity: str | None = None, timestamp: NumpyDatetime64NSArray | None = None, value: NumpyFloat64Array | NumpyObjArray | None = None, average: NumpyFloat64Array | None = None, max: NumpyFloat64Array | None = None, min: NumpyFloat64Array | None = None, count: NumpyInt64Array | None = None, sum: NumpyFloat64Array | None = None, interpolation: NumpyFloat64Array | None = None, step_interpolation: NumpyFloat64Array | None = None, continuous_variance: NumpyFloat64Array | None = None, discrete_variance: NumpyFloat64Array | None = None, total_variation: NumpyFloat64Array | None = None)
Bases:
CogniteResource
An object representing datapoints using numpy arrays.
- dump(camel_case: bool = False, convert_timestamps: bool = False) dict[str, Any]
Dump the DatapointsArray into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Default: False.
convert_timestamps (bool) – Convert timestamps to ISO 8601 formatted strings. Default: False (returns as integer, milliseconds since epoch)
- Returns
A dictionary representing the instance.
- Return type
dict[str, Any]
- to_pandas(column_names: Literal['id', 'external_id'] = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False) pandas.DataFrame
Convert the DatapointsArray into a pandas DataFrame.
- Parameters
column_names (Literal["id", "external_id"]) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.
include_aggregate_name (bool) – Include aggregate in the column name
include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)
- Returns
The datapoints as a pandas DataFrame.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.datapoints.DatapointsArrayList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[DatapointsArray
]- concat_duplicate_ids() None
Concatenates all arrays with duplicated IDs.
Arrays with the same ids are stacked in chronological order.
Caveat This method is not guaranteed to preserve the order of the list.
- dump(camel_case: bool = False, convert_timestamps: bool = False) list[dict[str, Any]]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Default: False.
convert_timestamps (bool) – Convert timestamps to ISO 8601 formatted strings. Default: False (returns as integer, milliseconds since epoch)
- Returns
A list of dicts representing the instance.
- Return type
list[dict[str, Any]]
- get(id: int | None = None, external_id: str | None = None) DatapointsArray | list[DatapointsArray] | None
Get a specific DatapointsArray from this list by id or external_id.
Note: For duplicated time series, returns a list of DatapointsArray.
- Parameters
id (int | None) – The id of the item(s) to get.
external_id (str | None) – The external_id of the item(s) to get.
- Returns
The requested item(s)
- Return type
DatapointsArray | list[DatapointsArray] | None
- to_pandas(column_names: Literal['id', 'external_id'] = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False) pandas.DataFrame
Convert the DatapointsArrayList into a pandas DataFrame.
- Parameters
column_names (Literal["id", "external_id"]) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.
include_aggregate_name (bool) – Include aggregate in the column name
include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)
- Returns
The datapoints as a pandas DataFrame.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.datapoints.DatapointsList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[Datapoints
]- get(id: int | None = None, external_id: str | None = None) Datapoints | list[Datapoints] | None
Get a specific Datapoints from this list by id or external_id.
Note: For duplicated time series, returns a list of Datapoints.
- Parameters
id (int | None) – The id of the item(s) to get.
external_id (str | None) – The external_id of the item(s) to get.
- Returns
The requested item(s)
- Return type
Datapoints | list[Datapoints] | None
- to_pandas(column_names: Literal['id', 'external_id'] = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False) pandas.DataFrame
Convert the datapoints list into a pandas DataFrame.
- Parameters
column_names (Literal["id", "external_id"]) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.
include_aggregate_name (bool) – Include aggregate in the column name
include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)
- Returns
The datapoints list as a pandas DataFrame.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.datapoints.LatestDatapointQuery(id: int | None = None, external_id: str | None = None, before: None | int | str | datetime = None)
Bases:
object
Parameters describing a query for the latest datapoint from a time series.
Note
Pass either ID or external ID.
- Parameters
id (Optional[int]) – The internal ID of the time series to query.
external_id (Optional[str]) – The external ID of the time series to query.
before (Union[None, int, str, datetime]) – Get latest datapoint before this time. None means ‘now’.
Files
Retrieve file metadata by id
- FilesAPI.retrieve(id: int | None = None, external_id: str | None = None) FileMetadata | None
Retrieve a single file metadata by id.
- Parameters
id (int | None) – ID
external_id (str | None) – External ID
- Returns
Requested file metadata or None if it does not exist.
- Return type
FileMetadata | None
Examples
Get file metadata by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.retrieve(id=1)
Get file metadata by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.retrieve(external_id="1")
Retrieve multiple files’ metadata by id
- FilesAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) FileMetadataList
Retrieve multiple file metadatas by id.
- Parameters
ids (Sequence[int] | None) – IDs
external_ids (Sequence[str] | None) – External IDs
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
The requested file metadatas.
- Return type
Examples
Get file metadatas by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.retrieve_multiple(ids=[1, 2, 3])
Get file_metadatas by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.retrieve_multiple(external_ids=["abc", "def"])
List files metadata
- FilesAPI.list(name: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, source_created_time: dict[str, Any] | TimestampRange | None = None, source_modified_time: dict[str, Any] | TimestampRange | None = None, uploaded_time: dict[str, Any] | TimestampRange | None = None, external_id_prefix: str | None = None, directory_prefix: str | None = None, uploaded: bool | None = None, limit: int | None = 25) FileMetadataList
-
- Parameters
name (str | None) – Name of the file.
mime_type (str | None) – File type. E.g. text/plain, application/pdf, ..
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value
asset_ids (Sequence[int] | None) – Only include files that reference these specific asset IDs.
asset_external_ids (Sequence[str] | None) – No description.
asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.
asset_subtree_external_ids (str | Sequence[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.
data_set_ids (int | Sequence[int] | None) – Return only files in the specified data set(s) with this id / these ids.
data_set_external_ids (str | Sequence[str] | None) – Return only files in the specified data set(s) with this external id / these external ids.
labels (LabelFilter | None) – Return only the files matching the specified label filter(s).
geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.
source (str | None) – The source of this event.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
source_created_time (dict[str, Any] | TimestampRange | None) – Filter for files where the sourceCreatedTime field has been set and is within the specified range.
source_modified_time (dict[str, Any] | TimestampRange | None) – Filter for files where the sourceModifiedTime field has been set and is within the specified range.
uploaded_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps
external_id_prefix (str | None) – External Id provided by client. Should be unique within the project.
directory_prefix (str | None) – Filter by this (case-sensitive) prefix for the directory provided by the client.
uploaded (bool | None) – Whether or not the actual file is uploaded. This field is returned only by the API, it has no effect in a post body.
limit (int | None) – Max number of files to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
The requested files.
- Return type
Examples
List files metadata and filter on external id prefix:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> file_list = c.files.list(limit=5, external_id_prefix="prefix")
Iterate over files metadata:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for file_metadata in c.files: ... file_metadata # do something with the file metadata
Iterate over chunks of files metadata to reduce memory load:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for file_list in c.files(chunk_size=2500): ... file_list # do something with the files
Filter files based on labels:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import LabelFilter >>> c = CogniteClient() >>> my_label_filter = LabelFilter(contains_all=["WELL LOG", "VERIFIED"]) >>> file_list = c.files.list(labels=my_label_filter)
Filter files based on geoLocation:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import GeoLocationFilter, GeometryFilter >>> c = CogniteClient() >>> my_geo_location_filter = GeoLocationFilter(relation="intersects", shape=GeometryFilter(type="Point", coordinates=[35,10])) >>> file_list = c.files.list(geo_location=my_geo_location_filter)
Aggregate files metadata
- FilesAPI.aggregate(filter: FileMetadataFilter | dict | None = None) list[FileAggregate]
-
- Parameters
filter (FileMetadataFilter | dict | None) – Filter on file metadata filter with exact match
- Returns
List of file aggregates
- Return type
list[FileAggregate]
Examples
List files metadata and filter on external id prefix:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> aggregate_uploaded = c.files.aggregate(filter={"uploaded": True})
Search for files
- FilesAPI.search(name: str | None = None, filter: FileMetadataFilter | dict | None = None, limit: int = 25) FileMetadataList
Search for files. Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.
- Parameters
name (str | None) – Prefix and fuzzy search on name.
filter (FileMetadataFilter | dict | None) – Filter to apply. Performs exact match on these fields.
limit (int) – Max number of results to return.
- Returns
List of requested files metadata.
- Return type
Examples
Search for a file:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.search(name="some name")
Search for an asset with an attached label:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_label_filter = LabelFilter(contains_all=["WELL LOG"]) >>> res = c.assets.search(name="xyz",filter=FileMetadataFilter(labels=my_label_filter))
Create file metadata
- FilesAPI.create(file_metadata: FileMetadata, overwrite: bool = False) tuple[FileMetadata, str]
Create file without uploading content.
- Parameters
file_metadata (FileMetadata) – File metadata for the file to create.
overwrite (bool) – If ‘overwrite’ is set to true, and the POST body content specifies a ‘externalId’ field, fields for the file found for externalId can be overwritten. The default setting is false. If metadata is included in the request body, all of the original metadata will be overwritten. File-Asset mappings only change if explicitly stated in the assetIds field of the POST json body. Do not set assetIds in request body if you want to keep the current file-asset mappings.
- Returns
Tuple containing the file metadata and upload url of the created file.
- Return type
tuple[FileMetadata, str]
Examples
Create a file:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import FileMetadata >>> c = CogniteClient() >>> file_metadata = FileMetadata(name="MyFile") >>> res = c.files.create(file_metadata)
Upload a file or directory
- FilesAPI.upload(path: str, external_id: str | None = None, name: str | None = None, source: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, directory: str | None = None, asset_ids: Sequence[int] | None = None, source_created_time: int | None = None, source_modified_time: int | None = None, data_set_id: int | None = None, labels: Sequence[Label] | None = None, geo_location: GeoLocation | None = None, security_categories: Sequence[int] | None = None, recursive: bool = False, overwrite: bool = False) FileMetadata | FileMetadataList
-
- Parameters
path (str) – Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory.
external_id (str | None) – The external ID provided by the client. Must be unique within the project.
name (str | None) – Name of the file.
source (str | None) – The source of the file.
mime_type (str | None) – File type. E.g. text/plain, application/pdf, …
metadata (dict[str, str] | None) – Customizable extra data about the file. String key -> String value.
directory (str | None) – The directory to be associated with this file. Must be an absolute, unix-style path.
asset_ids (Sequence[int] | None) – No description.
source_created_time (int | None) – The timestamp for when the file was originally created in the source system.
source_modified_time (int | None) – The timestamp for when the file was last modified in the source system.
data_set_id (int | None) – ID of the data set.
labels (Sequence[Label] | None) – A list of the labels associated with this resource item.
geo_location (GeoLocation | None) – The geographic metadata of the file.
security_categories (Sequence[int] | None) – Security categories to attach to this file.
recursive (bool) – If path is a directory, upload all contained files recursively.
overwrite (bool) – If ‘overwrite’ is set to true, and the POST body content specifies a ‘externalId’ field, fields for the file found for externalId can be overwritten. The default setting is false. If metadata is included in the request body, all of the original metadata will be overwritten. The actual file will be overwritten after successful upload. If there is no successful upload, the current file contents will be kept. File-Asset mappings only change if explicitly stated in the assetIds field of the POST json body. Do not set assetIds in request body if you want to keep the current file-asset mappings.
- Returns
The file metadata of the uploaded file(s).
- Return type
Examples
Upload a file in a given path:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.upload("/path/to/file", name="my_file")
If name is omitted, this method will use the name of the file
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.upload("/path/to/file")
You can also upload all files in a directory by setting path to the path of a directory:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.upload("/path/to/my/directory")
Upload a file with a label:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Label >>> c = CogniteClient() >>> res = c.files.upload("/path/to/file", name="my_file", labels=[Label(external_id="WELL LOG")])
Upload a file with a geo_location:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import GeoLocation, Geometry >>> c = CogniteClient() >>> geometry = Geometry(type="LineString", coordinates=[[30, 10], [10, 30], [40, 40]]) >>> res = c.files.upload("/path/to/file", geo_location=GeoLocation(type="Feature", geometry=geometry))
Upload a string or bytes
- FilesAPI.upload_bytes(content: str | bytes | TextIO | BinaryIO, name: str, external_id: str | None = None, source: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, directory: str | None = None, asset_ids: Sequence[int] | None = None, data_set_id: int | None = None, labels: Sequence[Label] | None = None, geo_location: GeoLocation | None = None, source_created_time: int | None = None, source_modified_time: int | None = None, security_categories: Sequence[int] | None = None, overwrite: bool = False) FileMetadata
Upload bytes or string.
You can also pass a file handle to content.
- Parameters
content (str | bytes | TextIO | BinaryIO) – The content to upload.
name (str) – Name of the file.
external_id (str | None) – The external ID provided by the client. Must be unique within the project.
source (str | None) – The source of the file.
mime_type (str | None) – File type. E.g. text/plain, application/pdf,…
metadata (dict[str, str] | None) – Customizable extra data about the file. String key -> String value.
directory (str | None) – The directory to be associated with this file. Must be an absolute, unix-style path.
asset_ids (Sequence[int] | None) – No description.
data_set_id (int | None) – Id of the data set.
labels (Sequence[Label] | None) – A list of the labels associated with this resource item.
geo_location (GeoLocation | None) – The geographic metadata of the file.
source_created_time (int | None) – The timestamp for when the file was originally created in the source system.
source_modified_time (int | None) – The timestamp for when the file was last modified in the source system.
security_categories (Sequence[int] | None) – Security categories to attach to this file.
overwrite (bool) – If ‘overwrite’ is set to true, and the POST body content specifies a ‘externalId’ field, fields for the file found for externalId can be overwritten. The default setting is false. If metadata is included in the request body, all of the original metadata will be overwritten. The actual file will be overwritten after successful upload. If there is no successful upload, the current file contents will be kept. File-Asset mappings only change if explicitly stated in the assetIds field of the POST json body. Do not set assetIds in request body if you want to keep the current file-asset mappings.
- Returns
No description.
- Return type
Examples
Upload a file from memory:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.files.upload_bytes(b"some content", name="my_file", asset_ids=[1,2,3])
Retrieve download urls
- FilesAPI.retrieve_download_urls(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, extended_expiration: bool = False) dict[int | str, str]
Get download links by id or external id
- Parameters
id (int | Sequence[int] | None) – Id or list of ids.
external_id (str | Sequence[str] | None) – External id or list of external ids.
extended_expiration (bool) – Extend expiration time of download url to 1 hour. Defaults to false.
- Returns
Dictionary containing download urls.
- Return type
dict[int | str, str]
Download files to disk
- FilesAPI.download(directory: str | Path, id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, keep_directory_structure: bool = False, resolve_duplicate_file_names: bool = False) None
Download files by id or external id.
This method will stream all files to disk, never keeping more than 2MB in memory per worker. The files will be stored in the provided directory using the file name retrieved from the file metadata in CDF. You can also choose to keep the directory structure from CDF so that the files will be stored in subdirectories matching the directory attribute on the files. When missing, the (root) directory is used. By default, duplicate file names to the same local folder will be resolved by only keeping one of the files. You can choose to resolve this by appending a number to the file name using the resolve_duplicate_file_names argument.
Warning
If you are downloading several files at once, be aware that file name collisions lead to all-but-one of the files missing. A warning is issued when this happens, listing the affected files.
- Parameters
directory (str | Path) – Directory to download the file(s) to.
id (int | Sequence[int] | None) – Id or list of ids
external_id (str | Sequence[str] | None) – External ID or list of external ids.
keep_directory_structure (bool) – Whether or not to keep the directory hierarchy in CDF, creating subdirectories as needed below the given directory.
resolve_duplicate_file_names (bool) – Whether or not to resolve duplicate file names by appending a number on duplicate file names
Examples
Download files by id and external id into directory ‘my_directory’:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.files.download(directory="my_directory", id=[1,2,3], external_id=["abc", "def"])
Download files by id to the current directory:
>>> c.files.download(directory=".", id=[1,2,3])
Download a single file to a specific path
- FilesAPI.download_to_path(path: Path | str, id: int | None = None, external_id: str | None = None) None
Download a file to a specific target.
- Parameters
path (Path | str) – The path in which to place the file.
id (int | None) – Id of of the file to download.
external_id (str | None) – External id of the file to download.
Examples
- Download a file by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.files.download_to_path("~/mydir/my_downloaded_file.txt", id=123)
Download a file as bytes
- FilesAPI.download_bytes(id: int | None = None, external_id: str | None = None) bytes
Download a file as bytes.
- Parameters
id (int | None) – Id of the file
external_id (str | None) – External id of the file
Examples
Download a file’s content into memory:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> file_content = c.files.download_bytes(id=1)
- Returns
No description.
- Return type
bytes
Delete files
- FilesAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None) None
-
- Parameters
id (int | Sequence[int] | None) – Id or list of ids
external_id (str | Sequence[str] | None) – str or list of str
Examples
Delete files by id or external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.files.delete(id=[1,2,3], external_id="3")
Update files metadata
- FilesAPI.update(item: FileMetadata | FileMetadataUpdate) FileMetadata
- FilesAPI.update(item: Sequence[FileMetadata | FileMetadataUpdate]) FileMetadataList
Update files Currently, a full replacement of labels on a file is not supported (only partial add/remove updates). See the example below on how to perform partial labels update.
- Parameters
item (FileMetadata | FileMetadataUpdate | Sequence[FileMetadata | FileMetadataUpdate]) – file(s) to update.
- Returns
The updated files.
- Return type
Examples
Update file metadata that you have fetched. This will perform a full update of the file metadata:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> file_metadata = c.files.retrieve(id=1) >>> file_metadata.description = "New description" >>> res = c.files.update(file_metadata)
Perform a partial update on file metadata, updating the source and adding a new field to metadata:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import FileMetadataUpdate >>> c = CogniteClient() >>> my_update = FileMetadataUpdate(id=1).source.set("new source").metadata.add({"key": "value"}) >>> res = c.files.update(my_update)
Attach labels to a files:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import FileMetadataUpdate >>> c = CogniteClient() >>> my_update = FileMetadataUpdate(id=1).labels.add(["PUMP", "VERIFIED"]) >>> res = c.files.update(my_update)
Detach a single label from a file:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import FileMetadataUpdate >>> c = CogniteClient() >>> my_update = FileMetadataUpdate(id=1).labels.remove("PUMP") >>> res = c.files.update(my_update)
Files Data classes
- class cognite.client.data_classes.files.FileAggregate(count: int | None = None, **kwargs: Any)
Bases:
dict
Aggregation results for files
- Parameters
count (int | None) – Number of filtered items included in aggregation
**kwargs (Any) – No description.
- class cognite.client.data_classes.files.FileMetadata(external_id: str | None = None, name: str | None = None, source: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, directory: str | None = None, asset_ids: Sequence[int] | None = None, data_set_id: int | None = None, labels: Sequence[Label] | None = None, geo_location: GeoLocation | None = None, source_created_time: int | None = None, source_modified_time: int | None = None, security_categories: Sequence[int] | None = None, id: int | None = None, uploaded: bool | None = None, uploaded_time: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
No description.
- Parameters
external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.
name (str | None) – Name of the file.
source (str | None) – The source of the file.
mime_type (str | None) – File type. E.g. text/plain, application/pdf, ..
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.
directory (str | None) – Directory associated with the file. Must be an absolute, unix-style path.
asset_ids (Sequence[int] | None) – No description.
data_set_id (int | None) – The dataSet Id for the item.
labels (Sequence[Label] | None) – A list of the labels associated with this resource item.
geo_location (GeoLocation | None) – The geographic metadata of the file.
source_created_time (int | None) – The timestamp for when the file was originally created in the source system.
source_modified_time (int | None) – The timestamp for when the file was last modified in the source system.
security_categories (Sequence[int] | None) – The security category IDs required to access this file.
id (int | None) – A server-generated ID for the object.
uploaded (bool | None) – Whether or not the actual file is uploaded. This field is returned only by the API, it has no effect in a post body.
uploaded_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
cognite_client (CogniteClient | None) – The client to associate with this object.
- class cognite.client.data_classes.files.FileMetadataFilter(name: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, uploaded_time: dict[str, Any] | TimestampRange | None = None, source_created_time: dict[str, Any] | None = None, source_modified_time: dict[str, Any] | None = None, external_id_prefix: str | None = None, directory_prefix: str | None = None, uploaded: bool | None = None)
Bases:
CogniteFilter
No description.
- Parameters
name (str | None) – Name of the file.
mime_type (str | None) – File type. E.g. text/plain, application/pdf, ..
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.
asset_ids (Sequence[int] | None) – Only include files that reference these specific asset IDs.
asset_external_ids (Sequence[str] | None) – Only include files that reference these specific asset external IDs.
data_set_ids (Sequence[dict[str, Any]] | None) – Only include files that belong to these datasets.
labels (LabelFilter | None) – Return only the files matching the specified label(s).
geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.
asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include files that have a related asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.
source (str | None) – The source of this event.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
uploaded_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
source_created_time (dict[str, Any] | None) – Filter for files where the sourceCreatedTime field has been set and is within the specified range.
source_modified_time (dict[str, Any] | None) – Filter for files where the sourceModifiedTime field has been set and is within the specified range.
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
directory_prefix (str | None) – Filter by this (case-sensitive) prefix for the directory provided by the client.
uploaded (bool | None) – Whether or not the actual file is uploaded. This field is returned only by the API, it has no effect in a post body.
- dump(camel_case: bool = False) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.files.FileMetadataList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[FileMetadata
],IdTransformerMixin
- class cognite.client.data_classes.files.FileMetadataUpdate(id: int | None = None, external_id: str | None = None)
Bases:
CogniteUpdate
Changes will be applied to file.
Args:
Geospatial
Note
Check https://github.com/cognitedata/geospatial-examples for some complete examples.
Create feature types
- GeospatialAPI.create_feature_types(feature_type: FeatureType) FeatureType
- GeospatialAPI.create_feature_types(feature_type: Sequence[FeatureType]) FeatureTypeList
Creates feature types <https://developer.cognite.com/api#tag/Geospatial/operation/createFeatureTypes>
- Parameters
feature_type (FeatureType | Sequence[FeatureType]) – feature type definition or list of feature type definitions to create.
- Returns
Created feature type definition(s)
- Return type
Examples
Create new type definitions:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.geospatial import FeatureType >>> c = CogniteClient() >>> feature_types = [ ... FeatureType(external_id="wells", properties={"location": {"type": "POINT", "srid": 4326}}) ... FeatureType( ... external_id="cities", ... properties={"name": {"type": "STRING", "size": 10}}, ... search_spec={"name_index": {"properties": ["name"]}} ... ) ... ] >>> res = c.geospatial.create_feature_types(feature_types)
Delete feature types
- GeospatialAPI.delete_feature_types(external_id: str | Sequence[str], recursive: bool = False) None
Delete one or more feature type <https://developer.cognite.com/api#tag/Geospatial/operation/GeospatialDeleteFeatureTypes>
- Parameters
external_id (str | Sequence[str]) – External ID or list of external ids
recursive (bool) – if true the features will also be dropped
Examples
Delete feature type definitions external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.geospatial.delete_feature_types(external_id=["wells", "cities"])
List feature types
- GeospatialAPI.list_feature_types() FeatureTypeList
List feature types <https://developer.cognite.com/api#tag/Geospatial/operation/listFeatureTypes>
- Returns
List of feature types
- Return type
Examples
Iterate over feature type definitions:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for feature_type in c.geospatial.list_feature_types(): ... feature_type # do something with the feature type definition
Retrieve feature types
- GeospatialAPI.retrieve_feature_types(external_id: str) FeatureType
- GeospatialAPI.retrieve_feature_types(external_id: list[str]) FeatureTypeList
Retrieve feature types <https://developer.cognite.com/api#tag/Geospatial/operation/getFeatureTypesByIds>
- Parameters
external_id (str | list[str]) – External ID
- Returns
Requested Type or None if it does not exist.
- Return type
Examples
Get Type by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.geospatial.retrieve_feature_types(external_id="1")
Update feature types
- GeospatialAPI.patch_feature_types(patch: FeatureTypePatch | Sequence[FeatureTypePatch]) FeatureTypeList
Patch feature types <https://developer.cognite.com/api#tag/Geospatial/operation/updateFeatureTypes>
- Parameters
patch (FeatureTypePatch | Sequence[FeatureTypePatch]) – the patch to apply
- Returns
The patched feature types.
- Return type
Examples
Add one property to a feature type and add indexes
>>> from cognite.client.data_classes.geospatial import Patches >>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.geospatial.patch_feature_types( ... patch=FeatureTypePatch( ... external_id="wells", ... property_patches=Patches(add={"altitude": {"type": "DOUBLE"}}), ... search_spec_patches=Patches( ... add={ ... "altitude_idx": {"properties": ["altitude"]}, ... "composite_idx": {"properties": ["location", "altitude"]} ... } ... ) ... ) ... )
Add an additional index to an existing property
>>> from cognite.client.data_classes.geospatial import Patches >>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.geospatial.patch_feature_types( ... patch=FeatureTypePatch( ... external_id="wells", ... search_spec_patches=Patches(add={"location_idx": {"properties": ["location"]}}) ... ))
Create features
- GeospatialAPI.create_features(feature_type_external_id: str, feature: Feature, allow_crs_transformation: bool = False, chunk_size: int | None = None) Feature
- GeospatialAPI.create_features(feature_type_external_id: str, feature: Sequence[Feature] | FeatureList, allow_crs_transformation: bool = False, chunk_size: int | None = None) FeatureList
Creates features <https://developer.cognite.com/api#tag/Geospatial/operation/createFeatures>
- Parameters
feature_type_external_id (str) – Feature type definition for the features to create.
feature (Feature | Sequence[Feature] | FeatureList) – one feature or a list of features to create or a FeatureList object
allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.
chunk_size (int | None) – maximum number of items in a single request to the api
- Returns
Created features
- Return type
Examples
Create a new feature type and corresponding feature:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> feature_types = [ ... FeatureType( ... external_id="my_feature_type", ... properties={ ... "location": {"type": "POINT", "srid": 4326}, ... "temperature": {"type": "DOUBLE"} ... } ... ) ... ] >>> res = c.geospatial.create_feature_types(feature_types) >>> res = c.geospatial.create_features( ... feature_type_external_id="my_feature_type", ... feature=Feature( ... external_id="my_feature", ... location={"wkt": "POINT(1 1)"}, ... temperature=12.4 ... ) ... )
Delete features
- GeospatialAPI.delete_features(feature_type_external_id: str, external_id: str | Sequence[str] | None = None) None
Delete one or more feature <https://developer.cognite.com/api#tag/Geospatial/operation/deleteFeatures>
- Parameters
feature_type_external_id (str) – No description.
external_id (str | Sequence[str] | None) – External ID or list of external ids
Examples
Delete feature type definitions external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.geospatial.delete_features( ... feature_type_external_id="my_feature_type", ... external_id=my_feature ... )
Retrieve features
- GeospatialAPI.retrieve_features(feature_type_external_id: str, external_id: str, properties: dict[str, Any] | None = None) Feature
- GeospatialAPI.retrieve_features(feature_type_external_id: str, external_id: list[str], properties: dict[str, Any] | None = None) FeatureList
Retrieve features <https://developer.cognite.com/api#tag/Geospatial/operation/getFeaturesByIds>
- Parameters
feature_type_external_id (str) – No description.
external_id (str | list[str]) – External ID or list of external ids
properties (dict[str, Any] | None) – the output property selection
- Returns
Requested features or None if it does not exist.
- Return type
Examples
Retrieve one feature by its external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.geospatial.retrieve_features( ... feature_type_external_id="my_feature_type", ... external_id="my_feature" ... )
Update features
- GeospatialAPI.update_features(feature_type_external_id: str, feature: Feature | Sequence[Feature], allow_crs_transformation: bool = False, chunk_size: int | None = None) FeatureList
Update features <https://developer.cognite.com/api#tag/Geospatial/operation/updateFeatures>
- Parameters
feature_type_external_id (str) – No description.
feature (Feature | Sequence[Feature]) – feature or list of features.
allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.
chunk_size (int | None) – maximum number of items in a single request to the api
- Returns
Updated features
- Return type
Examples
Update one feature:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_feature = c.geospatial.create_features( ... feature_type_external_id="my_feature_type", ... feature=Feature(external_id="my_feature", temperature=12.4) ... ) >>> my_updated_feature = c.geospatial.update_features( ... feature_type_external_id="my_feature_type", ... feature=Feature(external_id="my_feature", temperature=6.237) ... )
List features
- GeospatialAPI.list_features(feature_type_external_id: str, filter: dict[str, Any] | None = None, properties: dict[str, Any] | None = None, limit: int | None = 25, allow_crs_transformation: bool = False) FeatureList
List features <https://developer.cognite.com/api#tag/Geospatial/operation/listFeatures>
This method allows to filter all features.
- Parameters
feature_type_external_id (str) – the feature type to list features for
filter (dict[str, Any] | None) – the list filter
properties (dict[str, Any] | None) – the output property selection
limit (int | None) – Maximum number of features to return. Defaults to 25. Set to -1, float(“inf”) or None to return all features.
allow_crs_transformation (bool) – If true, then input geometries if existing in the filter will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.
- Returns
The filtered features
- Return type
Examples
List features:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_feature_type = c.geospatial.retrieve_feature_types( ... external_id="my_feature_type" ... ) >>> my_feature = c.geospatial.create_features( ... feature_type_external_id=my_feature_type, ... feature=Feature( ... external_id="my_feature", ... temperature=12.4, ... location={"wkt": "POINT(0 1)"} ... ) ... ) >>> res = c.geospatial.list_features( ... feature_type_external_id="my_feature_type", ... filter={"range": {"property": "temperature", "gt": 12.0}} ... ) >>> for f in res: ... # do something with the features
Search for features and select output properties:
>>> res = c.geospatial.list_features( ... feature_type_external_id=my_feature_type, ... filter={}, ... properties={"temperature": {}, "pressure": {}} ... )
Search for features with spatial filters:
>>> res = c.geospatial.list_features( ... feature_type_external_id=my_feature_type, ... filter={"stWithin": { ... "property": "location", ... "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"} ... }} ... )
Search features
- GeospatialAPI.search_features(feature_type_external_id: str, filter: dict[str, Any] | None = None, properties: dict[str, Any] | None = None, limit: int = 25, order_by: Sequence[OrderSpec] | None = None, allow_crs_transformation: bool = False) FeatureList
Search for features <https://developer.cognite.com/api#tag/Geospatial/operation/searchFeatures>
This method allows to order the result by one or more of the properties of the feature type. However, the number of items returned is limited to 1000 and there is no support for cursors yet. If you need to return more than 1000 items, use the stream_features(…) method instead.
- Parameters
feature_type_external_id (str) – The feature type to search for
filter (dict[str, Any] | None) – The search filter
properties (dict[str, Any] | None) – The output property selection
limit (int) – Maximum number of results
order_by (Sequence[OrderSpec] | None) – The order specification
allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.
- Returns
the filtered features
- Return type
Examples
Search for features:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_feature_type = c.geospatial.retrieve_feature_types( ... external_id="my_feature_type" ... ) >>> my_feature = c.geospatial.create_features( ... feature_type_external_id=my_feature_type, ... feature=Feature( ... external_id="my_feature", ... temperature=12.4, ... location={"wkt": "POINT(0 1)"} ... ) ... ) >>> res = c.geospatial.search_features( ... feature_type_external_id="my_feature_type", ... filter={"range": {"property": "temperature", "gt": 12.0}} ... ) >>> for f in res: ... # do something with the features
Search for features and select output properties:
>>> res = c.geospatial.search_features( ... feature_type_external_id=my_feature_type, ... filter={}, ... properties={"temperature": {}, "pressure": {}} ... )
Search for features and order results:
>>> res = c.geospatial.search_features( ... feature_type_external_id=my_feature_type, ... filter={}, ... order_by=[ ... OrderSpec("temperature", "ASC"), ... OrderSpec("pressure", "DESC")] ... )
Search for features with spatial filters:
>>> res = c.geospatial.search_features( ... feature_type_external_id=my_feature_type, ... filter={"stWithin": { ... "property": "location", ... "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"} ... }} ... )
Combining multiple filters:
>>> res = c.geospatial.search_features( ... feature_type_external_id=my_feature_type, ... filter={"and": [ ... {"range": {"property": "temperature", "gt": 12.0}}, ... {"stWithin": { ... "property": "location", ... "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"} ... }} ... ]} ... )
>>> res = c.geospatial.search_features( ... feature_type_external_id=my_feature_type, ... filter={"or": [ ... {"range": {"property": "temperature", "gt": 12.0}}, ... {"stWithin": { ... "property": "location", ... "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"} ... }} ... ]} ... )
Stream features
- GeospatialAPI.stream_features(feature_type_external_id: str, filter: dict[str, Any] | None = None, properties: dict[str, Any] | None = None, allow_crs_transformation: bool = False) Iterator[Feature]
Stream features <https://developer.cognite.com/api#tag/Geospatial/operation/searchFeaturesStreaming>
This method allows to return any number of items until the underlying api calls times out. The order of the result items is not deterministic. If you need to order the results, use the search_features(…) method instead.
- Parameters
feature_type_external_id (str) – the feature type to search for
filter (dict[str, Any] | None) – the search filter
properties (dict[str, Any] | None) – the output property selection
allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.
- Yields
Feature – a generator for the filtered features
Examples
Stream features:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_feature = c.geospatial.create_features( ... feature_type_external_id="my_feature_type", ... feature=Feature(external_id="my_feature", temperature=12.4) ... ) >>> features = c.geospatial.stream_features( ... feature_type_external_id="my_feature_type", ... filter={"range": {"property": "temperature", "gt": 12.0}} ... ) >>> for f in features: ... # do something with the features
Stream features and select output properties:
>>> features = c.geospatial.stream_features( ... feature_type_external_id="my_feature_type", ... filter={}, ... properties={"temperature": {}, "pressure": {}} ... ) >>> for f in features: ... # do something with the features
Aggregate features
- GeospatialAPI.aggregate_features(feature_type_external_id: str, property: str | None = None, aggregates: Sequence[str] | None = None, filter: dict[str, Any] | None = None, group_by: Sequence[str] | None = None, order_by: Sequence[OrderSpec] | None = None, output: dict[str, Any] | None = None) FeatureAggregateList
Aggregate filtered features <https://developer.cognite.com/api#tag/Geospatial/operation/aggregateFeatures>
- Parameters
feature_type_external_id (str) – the feature type to filter features from
property (str | None) – the property for which aggregates should be calculated
aggregates (Sequence[str] | None) – list of aggregates to be calculated
filter (dict[str, Any] | None) – the search filter
group_by (Sequence[str] | None) – list of properties to group by with
order_by (Sequence[OrderSpec] | None) – the order specification
output (dict[str, Any] | None) – the aggregate output
- Returns
the filtered features
- Return type
Examples
Aggregate property of features:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_feature = c.geospatial.create_features( ... feature_type_external_id="my_feature_type", ... feature=Feature(external_id="my_feature", temperature=12.4) ... ) >>> res_deprecated = c.geospatial.aggregate_features( ... feature_type_external_id="my_feature_type", ... filter={"range": {"property": "temperature", "gt": 12.0}}, ... property="temperature", ... aggregates=["max", "min"], ... group_by=["category"], ... order_by=[OrderSpec("category", "ASC")] ... ) # deprecated >>> res = c.geospatial.aggregate_features( ... feature_type_external_id="my_feature_type", ... filter={"range": {"property": "temperature", "gt": 12.0}}, ... group_by=["category"], ... order_by=[OrderSpec("category", "ASC")], ... output={"min_temperature": {"min": {"property": "temperature"}}, ... "max_volume": {"max": {"property": "volume"}} ... } ... ) >>> for a in res: ... # loop over aggregates in different groups
Get coordinate reference systems
- GeospatialAPI.get_coordinate_reference_systems(srids: int | Sequence[int]) CoordinateReferenceSystemList
Get Coordinate Reference Systems <https://developer.cognite.com/api#tag/Geospatial/operation/getCoordinateReferenceSystem>
- Parameters
srids (int | Sequence[int]) – (Union[int, Sequence[int]]): SRID or list of SRIDs
- Returns
Requested CRSs.
- Return type
Examples
Get two CRS definitions:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> crs = c.geospatial.get_coordinate_reference_systems(srids=[4326, 4327])
List coordinate reference systems
- GeospatialAPI.list_coordinate_reference_systems(only_custom: bool = False) CoordinateReferenceSystemList
List Coordinate Reference Systems <https://developer.cognite.com/api#tag/Geospatial/operation/listGeospatialCoordinateReferenceSystems>
- Parameters
only_custom (bool) – list only custom CRSs or not
- Returns
list of CRSs.
- Return type
Examples
Fetch all custom CRSs:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> crs = c.geospatial.list_coordinate_reference_systems(only_custom=True)
Create coordinate reference systems
- GeospatialAPI.create_coordinate_reference_systems(crs: CoordinateReferenceSystem | Sequence[CoordinateReferenceSystem]) CoordinateReferenceSystemList
Create Coordinate Reference System <https://developer.cognite.com/api#tag/Geospatial/operation/createGeospatialCoordinateReferenceSystems>
- Parameters
crs (CoordinateReferenceSystem | Sequence[CoordinateReferenceSystem]) – a CoordinateReferenceSystem or a list of CoordinateReferenceSystem
- Returns
list of CRSs.
- Return type
Examples
Create a custom CRS:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> custom_crs = CoordinateReferenceSystem( ... srid = 121111, ... wkt=( ... 'PROJCS["NTF (Paris) / Lambert zone II",' ... ' GEOGCS["NTF (Paris)",' ... ' DATUM["Nouvelle_Triangulation_Francaise_Paris",' ... ' SPHEROID["Clarke 1880 (IGN)",6378249.2,293.4660212936265,' ... ' AUTHORITY["EPSG","7011"]],' ... ' TOWGS84[-168,-60,320,0,0,0,0],' ... ' AUTHORITY["EPSG","6807"]],' ... ' PRIMEM["Paris",2.33722917,' ... ' AUTHORITY["EPSG","8903"]],' ... ' UNIT["grad",0.01570796326794897,' ... ' AUTHORITY["EPSG","9105"]], ' ... ' AUTHORITY["EPSG","4807"]],' ... ' PROJECTION["Lambert_Conformal_Conic_1SP"],' ... ' PARAMETER["latitude_of_origin",52],' ... ' PARAMETER["central_meridian",0],' ... ' PARAMETER["scale_factor",0.99987742],' ... ' PARAMETER["false_easting",600000],' ... ' PARAMETER["false_northing",2200000],' ... ' UNIT["metre",1,' ... ' AUTHORITY["EPSG","9001"]],' ... ' AXIS["X",EAST],' ... ' AXIS["Y",NORTH],' ... ' AUTHORITY["EPSG","27572"]]' ... ), ... proj_string=( ... '+proj=lcc +lat_1=46.8 +lat_0=46.8 +lon_0=0 +k_0=0.99987742 ' ... '+x_0=600000 +y_0=2200000 +a=6378249.2 +b=6356515 ' ... '+towgs84=-168,-60,320,0,0,0,0 +pm=paris +units=m +no_defs' ... ) ... ) >>> crs = c.geospatial.create_coordinate_reference_systems(custom_crs)
Put raster data
- GeospatialAPI.put_raster(feature_type_external_id: str, feature_external_id: str, raster_property_name: str, raster_format: str, raster_srid: int, file: str, allow_crs_transformation: bool = False, raster_scale_x: float | None = None, raster_scale_y: float | None = None) RasterMetadata
Put raster <https://developer.cognite.com/api#tag/Geospatial/operation/putRaster>
- Parameters
feature_type_external_id (str) – No description.
feature_external_id (str) – one feature or a list of features to create
raster_property_name (str) – the raster property name
raster_format (str) – the raster input format
raster_srid (int) – the associated SRID for the raster
file (str) – the path to the file of the raster
allow_crs_transformation (bool) – When the parameter is false, requests with rasters in Coordinate Reference System different from the one defined in the feature type will result in bad request response code.
raster_scale_x (float | None) – the X component of the pixel width in units of coordinate reference system
raster_scale_y (float | None) – the Y component of the pixel height in units of coordinate reference system
- Returns
the raster metadata if it was ingested successfully
- Return type
Examples
Put a raster in a feature raster property:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> feature_type = ... >>> feature = ... >>> raster_property_name = ... >>> metadata = c.geospatial.put_raster(feature_type.external_id, feature.external_id, ... raster_property_name, "XYZ", 3857, file)
Delete raster data
- GeospatialAPI.delete_raster(feature_type_external_id: str, feature_external_id: str, raster_property_name: str) None
Delete raster <https://developer.cognite.com/api#tag/Geospatial/operation/deleteRaster>
- Parameters
feature_type_external_id (str) – No description.
feature_external_id (str) – one feature or a list of features to create
raster_property_name (str) – the raster property name
Examples
Delete a raster in a feature raster property:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> feature_type = ... >>> feature = ... >>> raster_property_name = ... >>> c.geospatial.delete_raster(feature_type.external_id, feature.external_id, raster_property_name)
Get raster data
- GeospatialAPI.get_raster(feature_type_external_id: str, feature_external_id: str, raster_property_name: str, raster_format: str, raster_options: dict[str, Any] | None = None, raster_srid: int | None = None, raster_scale_x: float | None = None, raster_scale_y: float | None = None, allow_crs_transformation: bool = False) bytes
Get raster <https://developer.cognite.com/api#tag/Geospatial/operation/getRaster>
- Parameters
feature_type_external_id (str) – Feature type definition for the features to create.
feature_external_id (str) – one feature or a list of features to create
raster_property_name (str) – the raster property name
raster_format (str) – the raster output format
raster_options (dict[str, Any] | None) – GDAL raster creation key-value options
raster_srid (int | None) – the SRID for the output raster
raster_scale_x (float | None) – the X component of the output pixel width in units of coordinate reference system
raster_scale_y (float | None) – the Y component of the output pixel height in units of coordinate reference system
allow_crs_transformation (bool) – When the parameter is false, requests with output rasters in Coordinate Reference System different from the one defined in the feature type will result in bad request response code.
- Returns
the raster data
- Return type
bytes
Examples
Get a raster from a feature raster property:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> feature_type = ... >>> feature = ... >>> raster_property_name = ... >>> raster_data = c.geospatial.get_raster(feature_type.external_id, feature.external_id, ... raster_property_name, "XYZ", {"SIGNIFICANT_DIGITS": "4"})
Compute
- GeospatialAPI.compute(output: dict[str, GeospatialComputeFunction]) GeospatialComputedResponse
Compute <https://developer.cognite.com/api#tag/Geospatial/operation/compute>
- Parameters
output (dict[str, GeospatialComputeFunction]) – No description.
- Returns
Mapping of keys to computed items.
- Return type
Examples
Compute the transformation of an ewkt geometry from one SRID to another:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.geospatial import GeospatialGeometryTransformComputeFunction, GeospatialGeometryValueComputeFunction >>> c = CogniteClient() >>> compute_function = GeospatialGeometryTransformComputeFunction(GeospatialGeometryValueComputeFunction("SRID=4326;POLYGON((0 0,10 0,10 10,0 10,0 0))"), srid=23031) >>> compute_result = c.geospatial.compute(output = {"output": compute_function})
Geospatial Data classes
- class cognite.client.data_classes.geospatial.CoordinateReferenceSystem(srid: int | None = None, wkt: str | None = None, proj_string: str | None = None, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
A representation of a feature in the geospatial api.
- class cognite.client.data_classes.geospatial.CoordinateReferenceSystemList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
- class cognite.client.data_classes.geospatial.Feature(external_id: str | None = None, cognite_client: CogniteClient | None = None, **properties: Any)
Bases:
CogniteResource
A representation of a feature in the geospatial api.
- dump(camel_case: bool = False) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.geospatial.FeatureAggregate(cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
A result of aggregating features in geospatial api.
- class cognite.client.data_classes.geospatial.FeatureAggregateList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[FeatureAggregate
]
- class cognite.client.data_classes.geospatial.FeatureList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[Feature
]- static from_geopandas(feature_type: FeatureType, geodataframe: geopandas.GeoDataFrame, external_id_column: str = 'externalId', property_column_mapping: dict[str, str] | None = None, data_set_id_column: str = 'dataSetId') FeatureList
Convert a GeoDataFrame instance into a FeatureList.
- Parameters
feature_type (FeatureType) – The feature type the features will conform to
geodataframe (geopandas.GeoDataFrame) – the geodataframe instance to convert into features
external_id_column (str) – the geodataframe column to use for the feature external id
property_column_mapping (dict[str, str] | None) – provides a mapping from featuretype property names to geodataframe columns
data_set_id_column (str) – the geodataframe column to use for the feature dataSet id
- Returns
The list of features converted from the geodataframe rows.
- Return type
Examples
Create features from a geopandas dataframe:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> my_feature_type = ... # some feature type with 'position' and 'temperature' properties >>> my_geodataframe = ... # some geodataframe with 'center_xy', 'temp' and 'id' columns >>> feature_list = FeatureList.from_geopandas(feature_type=my_feature_type, geodataframe=my_geodataframe, >>> external_id_column="id", data_set_id_column="dataSetId", >>> property_column_mapping={'position': 'center_xy', 'temperature': 'temp'}) >>> created_features = c.geospatial.create_features(my_feature_type.external_id, feature_list)
- to_geopandas(geometry: str, camel_case: bool = False) geopandas.GeoDataFrame
Convert the instance into a GeoPandas GeoDataFrame.
- Parameters
geometry (str) – The name of the feature type geometry property to use in the GeoDataFrame
camel_case (bool) – Convert column names to camel case (e.g. externalId instead of external_id)
- Returns
The GeoPandas GeoDataFrame.
- Return type
geopandas.GeoDataFrame
Examples
Convert a FeatureList into a GeoPandas GeoDataFrame:
>>> from cognite.client.data_classes.geospatial import PropertyAndSearchSpec >>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> features = c.geospatial.search_features(...) >>> gdf = features.to_geopandas( ... geometry="position", ... camel_case=False ... ) >>> gdf.head()
- class cognite.client.data_classes.geospatial.FeatureType(external_id: str | None = None, data_set_id: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, properties: dict[str, Any] | None = None, search_spec: dict[str, Any] | None = None, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
A representation of a feature type in the geospatial api.
- class cognite.client.data_classes.geospatial.FeatureTypeList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[FeatureType
]
- class cognite.client.data_classes.geospatial.FeatureTypePatch(external_id: 'str | None' = None, property_patches: 'Patches | None' = None, search_spec_patches: 'Patches | None' = None)
Bases:
object
- class cognite.client.data_classes.geospatial.FeatureTypeUpdate(external_id: str | None = None, add: PropertyAndSearchSpec | None = None, remove: PropertyAndSearchSpec | None = None, cognite_client: CogniteClient | None = None)
Bases:
object
A representation of a feature type update in the geospatial api.
- class cognite.client.data_classes.geospatial.GeospatialComputeFunction
Bases:
ABC
A geospatial compute function
- abstract to_json_payload() dict
Convert function to json for request payload
- class cognite.client.data_classes.geospatial.GeospatialComputedItem(resource: dict[str, Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
A representation of an item computed from geospatial.
- class cognite.client.data_classes.geospatial.GeospatialComputedItemList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[GeospatialComputedItem
]A list of items computed from geospatial.
- class cognite.client.data_classes.geospatial.GeospatialComputedResponse(computed_item_list: GeospatialComputedItemList, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
The geospatial compute response.
- class cognite.client.data_classes.geospatial.GeospatialGeometryComputeFunction
Bases:
GeospatialComputeFunction
,ABC
A geospatial geometry compute function
- class cognite.client.data_classes.geospatial.GeospatialGeometryTransformComputeFunction(geospatial_geometry_compute_function: GeospatialComputeFunction, srid: int)
Bases:
GeospatialComputeFunction
A stTransform geospatial compute function
- to_json_payload() dict
Convert function to json for request payload
- class cognite.client.data_classes.geospatial.GeospatialGeometryValueComputeFunction(ewkt: str)
Bases:
GeospatialGeometryComputeFunction
A geospatial geometry value compute function. Accepts a well-known text of the geometry prefixed with a spatial reference identifier, see https://docs.geotools.org/stable/javadocs/org/opengis/referencing/doc-files/WKT.html :param ewkt: No description. :type ewkt: str
- to_json_payload() dict
Convert function to json for request payload
- class cognite.client.data_classes.geospatial.OrderSpec(property: str, direction: str)
Bases:
object
An order specification with respect to an property.
- class cognite.client.data_classes.geospatial.Patches(add: 'dict[str, Any] | None' = None, remove: 'list[str] | None' = None)
Bases:
object
- class cognite.client.data_classes.geospatial.PropertyAndSearchSpec(properties: dict[str, Any] | list[str] | None = None, search_spec: dict[str, Any] | list[str] | None = None)
Bases:
object
A representation of a feature type property and search spec.
- class cognite.client.data_classes.geospatial.RasterMetadata(**properties: Any)
Bases:
object
Raster metadata
- cognite.client.data_classes.geospatial.nan_to_none(column_value: Any) Any
Convert NaN value to None.
Synthetic time series
Calculate the result of a function on time series
- SyntheticDatapointsAPI.query(expressions: str | sympy.Expr | Sequence[str | sympy.Expr], start: int | str | datetime, end: int | str | datetime, limit: int | None = None, variables: dict[str, str | TimeSeries] | None = None, aggregate: str | None = None, granularity: str | None = None) Datapoints | DatapointsList
Calculate the result of a function on time series.
- Parameters
expressions (str | sympy.Expr | Sequence[str | sympy.Expr]) – Functions to be calculated. Supports both strings and sympy expressions. Strings can have either the API ts{} syntax, or contain variable names to be replaced using the variables parameter.
start (int | str | datetime) – Inclusive start.
end (int | str | datetime) – Exclusive end
limit (int | None) – Number of datapoints per expression to retrieve.
variables (dict[str, str | TimeSeries] | None) – An optional map of symbol replacements.
aggregate (str | None) – use this aggregate when replacing entries from variables, does not affect time series given in the ts{} syntax.
granularity (str | None) – use this granularity with the aggregate.
- Returns
A DatapointsList object containing the calculated data.
- Return type
Examples
Request a synthetic time series query with direct syntax
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> dps = c.time_series.data.synthetic.query(expressions="TS{id:123} + TS{externalId:'abc'}", start="2w-ago", end="now")
Use variables to re-use an expression:
>>> vars = {"A": "my_ts_external_id", "B": client.time_series.retrieve(id=1)} >>> dps = c.time_series.data.synthetic.query(expressions="A+B", start="2w-ago", end="now", variables=vars)
Use sympy to build complex expressions:
>>> from sympy import symbols, cos, sin >>> a = symbols('a') >>> dps = c.time_series.data.synthetic.query([sin(a), cos(a)], start="2w-ago", end="now", variables={"a": "my_ts_external_id"}, aggregate='interpolation', granularity='1m')
Time series
Warning
- TimeSeries unit support is a new feature:
The API specification is in beta.
The SDK implementation is in alpha.
Unit is implemented in the TimeSeries APIs with the parameters unit_external_id and unit_quantity in the methods below. It is only the use of these arguments that is in alpha. Using the methods below without these arguments is stable.
Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.
Retrieve a time series by id
- TimeSeriesAPI.retrieve(id: int | None = None, external_id: str | None = None) TimeSeries | None
Retrieve a single time series by id.
- Parameters
id (int | None) – ID
external_id (str | None) – External ID
- Returns
Requested time series or None if it does not exist.
- Return type
TimeSeries | None
Examples
Get time series by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.retrieve(id=1)
Get time series by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.retrieve(external_id="1")
Retrieve multiple time series by id
- TimeSeriesAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) TimeSeriesList
Retrieve multiple time series by id.
- Parameters
ids (Sequence[int] | None) – IDs
external_ids (Sequence[str] | None) – External IDs
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
The requested time series.
- Return type
Examples
Get time series by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.retrieve_multiple(ids=[1, 2, 3])
Get time series by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.retrieve_multiple(external_ids=["abc", "def"])
List time series
- TimeSeriesAPI.list(name: str | None = None, unit: str | None = None, unit_external_id: str | None = None, unit_quantity: str | None = None, is_string: bool | None = None, is_step: bool | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, metadata: dict[str, Any] | None = None, external_id_prefix: str | None = None, created_time: dict[str, Any] | None = None, last_updated_time: dict[str, Any] | None = None, partitions: int | None = None, limit: int | None = 25) TimeSeriesList
-
Fetches time series as they are iterated over, so you keep a limited number of objects in memory.
- Parameters
name (str | None) – Name of the time series. Often referred to as tag.
unit (str | None) – Unit of the time series.
unit_external_id (str | None) – Filter on unit external ID.
unit_quantity (str | None) – Filter on unit quantity.
is_string (bool | None) – Whether the time series is an string time series.
is_step (bool | None) – Whether the time series is a step (piecewise constant) time series.
asset_ids (Sequence[int] | None) – List time series related to these assets.
asset_external_ids (Sequence[str] | None) – List time series related to these assets.
asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.
asset_subtree_external_ids (str | Sequence[str] | None) – Asset external id or list of asset subtree external ids to filter on.
data_set_ids (int | Sequence[int] | None) – Return only time series in the specified data set(s) with this id / these ids.
data_set_external_ids (str | Sequence[str] | None) – Return only time series in the specified data set(s) with this external id / these external ids.
metadata (dict[str, Any] | None) – Custom, application specific metadata. String key -> String value
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
created_time (dict[str, Any] | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
last_updated_time (dict[str, Any] | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
partitions (int | None) – Retrieve time series in parallel using this number of workers. Also requires limit=None to be passed.
limit (int | None) – Maximum number of time series to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
The requested time series.
- Return type
Examples
List time series:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.list(limit=5)
Iterate over time series:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for ts in c.time_series: ... ts # do something with the time_series
Iterate over chunks of time series to reduce memory load:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for ts_list in c.time_series(chunk_size=2500): ... ts_list # do something with the time_series
Aggregate time series
- TimeSeriesAPI.aggregate(filter: TimeSeriesFilter | dict | None = None) list[TimeSeriesAggregate]
-
- Parameters
filter (TimeSeriesFilter | dict | None) – Filter on time series filter with exact match
- Returns
List of sequence aggregates
- Return type
list[TimeSeriesAggregate]
Examples
List time series:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.aggregate(filter={"unit": "kpa"})
Aggregate Time Series Count
- TimeSeriesAPI.aggregate_count(advanced_filter: Filter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) int
Count of time series matching the specified filters and search.
- Parameters
advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count.
filter (TimeSeriesFilter | dict | None) – The filter to narrow down time series to count requiring exact match.
- Returns
The number of time series matching the specified filters and search.
- Return type
int
Examples:
Count the number of time series in your CDF project:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> count = c.time_series.aggregate_count()
Count the number of numeric time series in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> c = CogniteClient() >>> is_numeric = filters.Equals(TimeSeriesProperty.is_string, False) >>> count = c.time_series.aggregate_count(advanced_filter=is_numeric)
Aggregate Time Series Values Cardinality
- TimeSeriesAPI.aggregate_cardinality_values(property: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) int
Find approximate property count for time series.
- Parameters
property (TimeSeriesProperty | str | list[str]) – The property to count the cardinality of.
advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.
- Returns
The number of properties matching the specified filters and search.
- Return type
int
Examples:
Count the number of different units used for time series in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> c = CogniteClient() >>> unit_count = c.time_series.aggregate_cardinality_values(TimeSeriesProperty.unit)
Count the number of timezones (metadata key) for time series with the word “critical” in the description in your CDF project, but exclude timezones from america:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters, aggregations as aggs >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> c = CogniteClient() >>> not_america = aggs.Not(aggs.Prefix("america")) >>> is_critical = filters.Search(TimeSeriesProperty.description, "critical") >>> timezone_count = c.time_series.aggregate_cardinality_values( ... TimeSeriesProperty.metadata_key("timezone"), ... advanced_filter=is_critical, ... aggregate_filter=not_america)
Aggregate Time Series Property Cardinality
- TimeSeriesAPI.aggregate_cardinality_properties(path: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) int
Find approximate paths count for time series.
- Parameters
path (TimeSeriesProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.
- Returns
The number of properties matching the specified filters and search.
- Return type
int
Examples
Count the number of metadata keys in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> c = CogniteClient() >>> key_count = c.time_series.aggregate_cardinality_properties(TimeSeriesProperty.metadata)
Aggregate Time Series Unique Values
- TimeSeriesAPI.aggregate_unique_values(property: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) UniqueResultList
Get unique properties with counts for time series.
- Parameters
property (TimeSeriesProperty | str | list[str]) – The property to group by.
advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.
- Returns
List of unique values of time series matching the specified filters and search.
- Return type
UniqueResultList
Examples
Get the timezones (metadata key) with count for your time series in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> c = CogniteClient() >>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.metadata_key("timezone")) >>> print(result.unique)
Get the different units with count used for time series created after 2020-01-01 in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> from cognite.client.utils import timestamp_to_ms >>> from datetime import datetime >>> c = CogniteClient() >>> created_after_2020 = filters.Range(TimeSeriesProperty.created_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.unit, advanced_filter=created_after_2020) >>> print(result.unique)
Get the different units with count for time series updated after 2020-01-01 in your CDF project, but exclude all units that start with “test”:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> from cognite.client.data_classes import aggregations as aggs, filters >>> c = CogniteClient() >>> not_test = aggs.Not(aggs.Prefix("test")) >>> created_after_2020 = filters.Range(TimeSeriesProperty.last_updated_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.unit, advanced_filter=created_after_2020, aggregate_filter=not_test) >>> print(result.unique)
Aggregate Time Series Unique Properties
- TimeSeriesAPI.aggregate_unique_properties(path: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) UniqueResultList
Get unique paths with counts for time series.
- Parameters
path (TimeSeriesProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.
- Returns
List of unique values of time series matching the specified filters and search.
- Return type
UniqueResultList
Examples
Get the metadata keys with count for your time series in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.time_series import TimeSeriesProperty >>> c = CogniteClient() >>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.metadata)
Search for time series
- TimeSeriesAPI.search(name: str | None = None, description: str | None = None, query: str | None = None, filter: TimeSeriesFilter | dict | None = None, limit: int = 25) TimeSeriesList
Search for time series. Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.
- Parameters
name (str | None) – Prefix and fuzzy search on name.
description (str | None) – Prefix and fuzzy search on description.
query (str | None) – Search on name and description using wildcard search on each of the words (separated by spaces). Retrieves results where at least one word must match. Example: ‘some other’
filter (TimeSeriesFilter | dict | None) – Filter to apply. Performs exact match on these fields.
limit (int) – Max number of results to return.
- Returns
List of requested time series.
- Return type
Examples
Search for a time series:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.search(name="some name")
Search for all time series connected to asset with id 123:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.search(filter={"asset_ids":[123]})
Create time series
- TimeSeriesAPI.create(time_series: Sequence[TimeSeries]) TimeSeriesList
- TimeSeriesAPI.create(time_series: TimeSeries) TimeSeries
Create one or more time series.
- Parameters
time_series (TimeSeries | Sequence[TimeSeries]) – TimeSeries or list of TimeSeries to create.
- Returns
The created time series.
- Return type
Examples
Create a new time series:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import TimeSeries >>> c = CogniteClient() >>> ts = c.time_series.create(TimeSeries(name="my_ts", data_set_id=123, external_id="foo"))
Delete time series
- TimeSeriesAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, ignore_unknown_ids: bool = False) None
Delete one or more time series.
- Parameters
id (int | Sequence[int] | None) – Id or list of ids
external_id (str | Sequence[str] | None) – External ID or list of external ids
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
Examples
Delete time series by id or external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.time_series.delete(id=[1,2,3], external_id="3")
Filter time series
- TimeSeriesAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, limit: int | None = 25) TimeSeriesList
-
Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.
- Parameters
filter (Filter | dict) – Filter to apply.
sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.
limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of time series that match the filter criteria.
- Return type
Examples
Find all numeric time series and return them sorted by external id:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> c = CogniteClient() >>> f = filters >>> is_numeric = f.Equals("is_string", False) >>> res = c.time_series.filter(filter=is_numeric, sort="external_id")
Note that you can check the API documentation above to see which properties you can filter on with which filters.
To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the TimeSeriesProperty and SortableTimeSeriesProperty enums.
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.time_series import TimeSeriesProperty, SortableTimeSeriesProperty >>> c = CogniteClient() >>> f = filters >>> is_numeric = f.Equals(TimeSeriesProperty.is_string, False) >>> res = c.time_series.filter(filter=is_numeric, sort=SortableTimeSeriesProperty.external_id)
Update time series
- TimeSeriesAPI.update(item: Sequence[TimeSeries | TimeSeriesUpdate]) TimeSeriesList
- TimeSeriesAPI.update(item: TimeSeries | TimeSeriesUpdate) TimeSeries
Update one or more time series.
- Parameters
item (TimeSeries | TimeSeriesUpdate | Sequence[TimeSeries | TimeSeriesUpdate]) – Time series to update
- Returns
Updated time series.
- Return type
Examples
Update a time series that you have fetched. This will perform a full update of the time series:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.retrieve(id=1) >>> res.description = "New description" >>> res = c.time_series.update(res)
Perform a partial update on a time series, updating the description and adding a new field to metadata:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import TimeSeriesUpdate >>> c = CogniteClient() >>> my_update = TimeSeriesUpdate(id=1).description.set("New description").metadata.add({"key": "value"}) >>> res = c.time_series.update(my_update)
Upsert time series
- TimeSeriesAPI.upsert(item: Sequence[TimeSeries], mode: Literal['patch', 'replace'] = 'patch') TimeSeriesList
- TimeSeriesAPI.upsert(item: TimeSeries, mode: Literal['patch', 'replace'] = 'patch') TimeSeries
- Upsert time series, i.e., update if it exists, and create if it does not exist.
Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.
For more details, see Upsert.
- Parameters
item (TimeSeries | Sequence[TimeSeries]) – TimeSeries or list of TimeSeries to upsert.
mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the time series are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified. Note replace will skip beta properties (unit_external_id), if you want to replace the beta properties you have to use the update method.
- Returns
The upserted time series(s).
- Return type
Examples
Upsert for TimeSeries:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import TimeSeries >>> c = CogniteClient() >>> existing_time_series = c.time_series.retrieve(id=1) >>> existing_time_series.description = "New description" >>> new_time_series = TimeSeries(external_id="new_timeSeries", description="New timeSeries") >>> res = c.time_series.upsert([existing_time_series, new_time_series], mode="replace")
Time Series Data classes
- class cognite.client.data_classes.time_series.SortableTimeSeriesProperty(value)
Bases:
EnumProperty
An enumeration.
- class cognite.client.data_classes.time_series.TimeSeries(id: int | None = None, external_id: str | None = None, name: str | None = None, is_string: bool | None = None, metadata: dict[str, str] | None = None, unit: str | None = None, unit_external_id: str | None = None, asset_id: int | None = None, is_step: bool | None = None, description: str | None = None, security_categories: Sequence[int] | None = None, data_set_id: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, legacy_name: str | None = None, cognite_client: CogniteClient | None = None)
Bases:
CogniteResource
No description.
- Parameters
id (int | None) – A server-generated ID for the object.
external_id (str | None) – The externally supplied ID for the time series.
name (str | None) – The display short name of the time series. Note: Value of this field can differ from name presented by older versions of API 0.3-0.6.
is_string (bool | None) – Whether the time series is string valued or not.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.
unit (str | None) – The physical unit of the time series.
unit_external_id (str | None) – The physical unit of the time series (reference to unit catalog). Only available for numeric time series.
asset_id (int | None) – Asset ID of equipment linked to this time series.
is_step (bool | None) – Whether the time series is a step series or not.
description (str | None) – Description of the time series.
security_categories (Sequence[int] | None) – The required security categories to access this time series.
data_set_id (int | None) – The dataSet Id for the item.
created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
legacy_name (str | None) – Set a value for legacyName to allow applications using API v0.3, v04, v05, and v0.6 to access this time series. The legacy name is the human-readable name for the time series and is mapped to the name field used in API versions 0.3-0.6. The legacyName field value must be unique, and setting this value to an already existing value will return an error. We recommend that you set this field to the same value as externalId.
cognite_client (CogniteClient | None) – The client to associate with this object.
- asset() Asset
Returns the asset this time series belongs to.
- Returns
The asset given by its asset_id.
- Return type
- Raises
ValueError – If asset_id is missing.
- count() int
Returns the number of datapoints in this time series.
This result may not be completely accurate, as it is based on aggregates which may be occasionally out of date.
- Returns
The number of datapoints in this time series.
- Return type
int
- Raises
RuntimeError – If the time series is string, as count aggregate is only supported for numeric data
- Returns
The total number of datapoints
- Return type
int
- first() Datapoint | None
Returns the first datapoint in this time series. If empty, returns None.
- Returns
A datapoint object containing the value and timestamp of the first datapoint.
- Return type
Datapoint | None
- latest(before: int | str | datetime | None = None) Datapoint | None
Returns the latest datapoint in this time series. If empty, returns None.
- Parameters
before (int | str | datetime | None) – No description.
- Returns
A datapoint object containing the value and timestamp of the latest datapoint.
- Return type
Datapoint | None
- class cognite.client.data_classes.time_series.TimeSeriesAggregate(count: int | None = None, **kwargs: Any)
Bases:
dict
No description.
- Parameters
count (int | None) – No description.
**kwargs (Any) – No description.
- class cognite.client.data_classes.time_series.TimeSeriesFilter(name: str | None = None, unit: str | None = None, unit_external_id: str | None = None, unit_quantity: str | None = None, is_string: bool | None = None, is_step: bool | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, external_id_prefix: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None)
Bases:
CogniteFilter
No description.
- Parameters
name (str | None) – Filter on name.
unit (str | None) – Filter on unit.
unit_external_id (str | None) – Filter on unit external ID.
unit_quantity (str | None) – Filter on unit quantity.
is_string (bool | None) – Filter on isString.
is_step (bool | None) – Filter on isStep.
metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.
asset_ids (Sequence[int] | None) – Only include time series that reference these specific asset IDs.
asset_external_ids (Sequence[str] | None) – Asset External IDs of related equipment that this time series relates to.
asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include time series that are related to an asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.
data_set_ids (Sequence[dict[str, Any]] | None) – No description.
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
- class cognite.client.data_classes.time_series.TimeSeriesList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[TimeSeries
],IdTransformerMixin
- class cognite.client.data_classes.time_series.TimeSeriesProperty(value)
Bases:
EnumProperty
An enumeration.
- class cognite.client.data_classes.time_series.TimeSeriesUpdate(id: int | None = None, external_id: str | None = None)
Bases:
CogniteUpdate
Changes will be applied to time series.
- Parameters
id (int) – A server-generated ID for the object.
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
Sequences
Retrieve a sequence by id
- SequencesAPI.retrieve(id: int | None = None, external_id: str | None = None) Sequence | None
Retrieve a single sequence by id.
- Parameters
id (int | None) – ID
external_id (str | None) – External ID
- Returns
Requested sequence or None if it does not exist.
- Return type
Sequence | None
Examples
Get sequence by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.retrieve(id=1)
Get sequence by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.retrieve(external_id="1")
Retrieve multiple sequences by id
- SequencesAPI.retrieve_multiple(ids: SequenceType[int] | None = None, external_ids: SequenceType[str] | None = None, ignore_unknown_ids: bool = False) SequenceList
Retrieve multiple sequences by id.
- Parameters
ids (SequenceType[int] | None) – IDs
external_ids (SequenceType[str] | None) – External IDs
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
The requested sequences.
- Return type
Examples
Get sequences by id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.retrieve_multiple(ids=[1, 2, 3])
Get sequences by external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.retrieve_multiple(external_ids=["abc", "def"])
List sequences
- SequencesAPI.list(name: str | None = None, external_id_prefix: str | None = None, metadata: dict[str, str] | None = None, asset_ids: SequenceType[int] | None = None, asset_subtree_ids: int | SequenceType[int] | None = None, asset_subtree_external_ids: str | SequenceType[str] | None = None, data_set_ids: int | SequenceType[int] | None = None, data_set_external_ids: str | SequenceType[str] | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, limit: int | None = 25) SequenceList
-
Fetches sequences as they are iterated over, so you keep a limited number of objects in memory.
- Parameters
name (str | None) – Filter out sequences that do not have this exact name.
external_id_prefix (str | None) – Filter out sequences that do not have this string as the start of the externalId
metadata (dict[str, str] | None) – Filter out sequences that do not match these metadata fields and values (case-sensitive). Format is {“key1”:”value1”,”key2”:”value2”}.
asset_ids (SequenceType[int] | None) – Filter out sequences that are not linked to any of these assets.
asset_subtree_ids (int | SequenceType[int] | None) – Asset subtree id or list of asset subtree ids to filter on.
asset_subtree_external_ids (str | SequenceType[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.
data_set_ids (int | SequenceType[int] | None) – Return only sequences in the specified data set(s) with this id / these ids.
data_set_external_ids (str | SequenceType[str] | None) – Return only sequences in the specified data set(s) with this external id / these external ids.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.
limit (int | None) – Max number of sequences to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
The requested sequences.
- Return type
Examples
List sequences:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.list(limit=5)
Iterate over sequences:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for seq in c.sequences: ... seq # do something with the sequences
Iterate over chunks of sequences to reduce memory load:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for seq_list in c.sequences(chunk_size=2500): ... seq_list # do something with the sequences
Aggregate sequences
- SequencesAPI.aggregate(filter: SequenceFilter | dict | None = None) list[SequenceAggregate]
-
- Parameters
filter (SequenceFilter | dict | None) – Filter on sequence filter with exact match
- Returns
List of sequence aggregates
- Return type
list[SequenceAggregate]
Examples
Aggregate sequences:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.aggregate(filter={"external_id_prefix": "prefix"})
Aggregate Sequences Count
- SequencesAPI.aggregate_count(advanced_filter: Filter | dict | None = None, filter: SequenceFilter | dict | None = None) int
Count of sequences matching the specified filters and search.
- Parameters
advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count.
filter (SequenceFilter | dict | None) – The filter to narrow down sequences to count requiring exact match.
- Returns
The number of sequences matching the specified filters and search.
- Return type
int
Examples
Count the number of time series in your CDF project:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> count = c.sequences.aggregate_count()
Count the number of sequences with external id prefixed with “mapping:” in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.sequences import SequenceProperty >>> c = CogniteClient() >>> is_mapping = filters.Prefix(SequenceProperty.external_id, "mapping:") >>> count = c.sequences.aggregate_count(advanced_filter=is_mapping)
Aggregate Sequences Value Cardinality
- SequencesAPI.aggregate_cardinality_values(property: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) int
Find approximate property count for sequences.
- Parameters
property (SequenceProperty | str | list[str]) – The property to count the cardinality of.
advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.
- Returns
The number of properties matching the specified filters and search.
- Return type
int
Examples
Count the number of different values for the metadata key “efficiency” used for sequences in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.sequences import SequenceProperty >>> c = CogniteClient() >>> count = c.sequences.aggregate_cardinality_values(SequenceProperty.metadata_key("efficiency"))
Count the number of timezones (metadata key) for sequences with the word “critical” in the description in your CDF project, but exclude timezones from america:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters, aggregations as aggs >>> from cognite.client.data_classes.sequences import SequenceProperty >>> c = CogniteClient() >>> not_america = aggs.Not(aggs.Prefix("america")) >>> is_critical = filters.Search(SequenceProperty.description, "critical") >>> timezone_count = c.sequences.aggregate_cardinality_values( ... SequenceProperty.metadata_key("timezone"), ... advanced_filter=is_critical, ... aggregate_filter=not_america)
Aggregate Sequences Property Cardinality
- SequencesAPI.aggregate_cardinality_properties(path: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) int
Find approximate paths count for sequences.
- Parameters
path (SequenceProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.
- Returns
The number of properties matching the specified filters and search.
- Return type
int
Examples
Count the number of different metadata keys in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.sequences import SequenceProperty >>> c = CogniteClient() >>> count = c.sequences.aggregate_cardinality_values(SequenceProperty.metadata)
Aggregate Sequences Unique Values
- SequencesAPI.aggregate_unique_values(property: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) UniqueResultList
Get unique paths with counts for sequences.
- Parameters
property (SequenceProperty | str | list[str]) – The property to group by.
advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.
- Returns
List of unique values of sequences matching the specified filters and search.
- Return type
UniqueResultList
Examples
Get the timezones (metadata key) with count for your sequences in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.sequences import SequenceProperty >>> c = CogniteClient() >>> result = c.sequences.aggregate_unique_values(SequenceProperty.metadata_key("timezone")) >>> print(result.unique)
Get the different metadata keys with count used for sequences created after 2020-01-01 in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.sequences import SequenceProperty >>> from cognite.client.utils import timestamp_to_ms >>> from datetime import datetime >>> c = CogniteClient() >>> created_after_2020 = filters.Range(SequenceProperty.created_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.sequences.aggregate_unique_values(SequenceProperty.metadata, advanced_filter=created_after_2020) >>> print(result.unique)
Get the different metadata keys with count for sequences updated after 2020-01-01 in your CDF project, but exclude all metadata keys that starts with “test”:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.sequences import SequenceProperty >>> from cognite.client.data_classes import aggregations as aggs, filters >>> c = CogniteClient() >>> not_test = aggs.Not(aggs.Prefix("test")) >>> created_after_2020 = filters.Range(SequenceProperty.last_updated_time, gte=timestamp_to_ms(datetime(2020, 1, 1))) >>> result = c.sequences.aggregate_unique_values(SequenceProperty.metadata, advanced_filter=created_after_2020, aggregate_filter=not_test) >>> print(result.unique)
Aggregate Sequences Unique Properties
- SequencesAPI.aggregate_unique_properties(path: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) UniqueResultList
Find approximate unique sequence properties.
- Parameters
path (SequenceProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).
advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.
aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.
filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.
- Returns
List of unique values of sequences matching the specified filters and search.
- Return type
UniqueResultList
Examples
Get the metadata keys with count for your sequences in your CDF project:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.sequences import SequenceProperty >>> c = CogniteClient() >>> result = c.sequences.aggregate_unique_properties(SequenceProperty.metadata)
Search for sequences
- SequencesAPI.search(name: str | None = None, description: str | None = None, query: str | None = None, filter: SequenceFilter | dict | None = None, limit: int = 25) SequenceList
Search for sequences. Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.
- Parameters
name (str | None) – Prefix and fuzzy search on name.
description (str | None) – Prefix and fuzzy search on description.
query (str | None) – Search on name and description using wildcard search on each of the words (separated by spaces). Retrieves results where at least one word must match. Example: ‘some other’
filter (SequenceFilter | dict | None) – Filter to apply. Performs exact match on these fields.
limit (int) – Max number of results to return.
- Returns
The search result as a SequenceList
- Return type
Examples
Search for a sequence:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.search(name="some name")
Create a sequence
- SequencesAPI.create(sequence: Sequence) Sequence
- SequencesAPI.create(sequence: Sequence[Sequence]) SequenceList
-
- Parameters
sequence (Sequence | SequenceType[Sequence]) – Sequence or list of Sequence to create. The Sequence columns parameter is a list of objects with fields externalId (external id of the column, when omitted, they will be given ids of ‘column0, column1, …’), valueType (data type of the column, either STRING, LONG, or DOUBLE, with default DOUBLE), name, description, metadata (optional fields to describe and store information about the data in the column). Other fields will be removed automatically, so a columns definition from a different sequence object can be passed here.
- Returns
The created sequence(s).
- Return type
Sequence | SequenceList
Examples
Create a new sequence:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Sequence >>> c = CogniteClient() >>> column_def = [ ... {"valueType": "STRING", "externalId": "user", "description": "some description"}, ... {"valueType": "DOUBLE", "externalId": "amount"}] >>> seq = c.sequences.create(Sequence(external_id="my_sequence", columns=column_def))
Create a new sequence with the same column specifications as an existing sequence:
>>> seq2 = c.sequences.create(Sequence(external_id="my_copied_sequence", columns=seq.columns))
Delete sequences
- SequencesAPI.delete(id: int | SequenceType[int] | None = None, external_id: str | SequenceType[str] | None = None, ignore_unknown_ids: bool = False) None
-
- Parameters
id (int | SequenceType[int] | None) – Id or list of ids
external_id (str | SequenceType[str] | None) – External ID or list of external ids
ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.
Examples
Delete sequences by id or external id:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.sequences.delete(id=[1,2,3], external_id="3")
Filter sequences
- SequencesAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, limit: int | None = 25) SequenceList
-
Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.
- Parameters
filter (Filter | dict) – Filter to apply.
sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.
limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of sequences that match the filter criteria.
- Return type
Examples
Find all sequences with asset id ‘123’ and metadata key ‘type’ equals ‘efficiency’ and return them sorted by created time:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> c = CogniteClient() >>> f = filters >>> is_asset = f.Equals("asset_id", 123) >>> is_efficiency = f.Equals(["metadata", "type"], "efficiency") >>> res = c.time_series.filter(filter=f.And(is_asset, is_efficiency), sort="created_time")
Note that you can check the API documentation above to see which properties you can filter on with which filters.
To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the SequenceProperty and SortableSequenceProperty enums.
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.sequences import SequenceProperty, SortableSequenceProperty >>> c = CogniteClient() >>> f = filters >>> is_asset = f.Equals(SequenceProperty.asset_id, 123) >>> is_efficiency = f.Equals(SequenceProperty.metadata_key("type"), "efficiency") >>> res = c.time_series.filter(filter=f.And(is_asset, is_efficiency), ... sort=SortableSequenceProperty.created_time)
Update sequences
- SequencesAPI.update(item: Sequence | SequenceUpdate) Sequence
- SequencesAPI.update(item: SequenceType[Sequence | SequenceUpdate]) SequenceList
-
- Parameters
item (Sequence | SequenceUpdate | SequenceType[Sequence | SequenceUpdate]) – Sequences to update
- Returns
Updated sequences.
- Return type
Sequence | SequenceList
Examples
Update a sequence that you have fetched. This will perform a full update of the sequences:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.retrieve(id=1) >>> res.description = "New description" >>> res = c.sequences.update(res)
Perform a partial update on a sequence, updating the description and adding a new field to metadata:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import SequenceUpdate >>> c = CogniteClient() >>> my_update = SequenceUpdate(id=1).description.set("New description").metadata.add({"key": "value"}) >>> res = c.sequences.update(my_update)
Updating column definitions
Currently, updating the column definitions of a sequence is only supported through partial update, using add, remove and modify methods on the columns property.
Add a single new column:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import SequenceUpdate >>> c = CogniteClient() >>> >>> my_update = SequenceUpdate(id=1).columns.add({"valueType":"STRING","externalId":"user","description":"some description"}) >>> res = c.sequences.update(my_update)
Add multiple new columns:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import SequenceUpdate >>> c = CogniteClient() >>> >>> column_def = [{"valueType":"STRING","externalId":"user","description":"some description"}, {"valueType":"DOUBLE","externalId":"amount"}] >>> my_update = SequenceUpdate(id=1).columns.add(column_def) >>> res = c.sequences.update(my_update)
Remove a single column:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import SequenceUpdate >>> c = CogniteClient() >>> >>> my_update = SequenceUpdate(id=1).columns.remove("col_external_id1") >>> res = c.sequences.update(my_update)
Remove multiple columns:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import SequenceUpdate >>> c = CogniteClient() >>> >>> my_update = SequenceUpdate(id=1).columns.remove(["col_external_id1","col_external_id2"]) >>> res = c.sequences.update(my_update)
Update existing columns:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import SequenceUpdate, SequenceColumnUpdate >>> c = CogniteClient() >>> >>> column_updates = [ ... SequenceColumnUpdate(external_id="col_external_id_1").external_id.set("new_col_external_id"), ... SequenceColumnUpdate(external_id="col_external_id_2").description.set("my new description"), ... ] >>> my_update = SequenceUpdate(id=1).columns.modify(column_updates) >>> res = c.sequences.update(my_update)
Upsert sequences
- SequencesAPI.upsert(item: Sequence[Sequence], mode: Literal['patch', 'replace'] = 'patch') SequenceList
- SequencesAPI.upsert(item: Sequence, mode: Literal['patch', 'replace'] = 'patch') Sequence
- Upsert sequences, i.e., update if it exists, and create if it does not exist.
Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.
For more details, see Upsert.
- Parameters
item (Sequence | SequenceType[Sequence]) – Sequence or list of sequences to upsert.
mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the sequences are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified.
- Returns
The upserted sequence(s).
- Return type
Sequence | SequenceList
Examples
Upsert for sequences:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import Sequence >>> c = CogniteClient() >>> existing_sequence = c.sequences.retrieve(id=1) >>> existing_sequence.description = "New description" >>> new_sequence = Sequence(external_id="new_sequence", description="New sequence") >>> res = c.sequences.upsert([existing_sequence, new_sequence], mode="replace")
Retrieve data
- SequencesDataAPI.retrieve(start: int, end: int | None, column_external_ids: SequenceType[str] | None = None, external_id: str | SequenceType[str] | None = None, id: int | SequenceType[int] | None = None, limit: int | None = None) SequenceData | SequenceDataList
-
- Parameters
start (int) – Row number to start from (inclusive).
end (int | None) – Upper limit on the row number (exclusive). Set to None or -1 to get all rows until end of sequence.
column_external_ids (SequenceType[str] | None) – List of external id for the columns of the sequence. If ‘None’ is passed, all columns will be retrieved.
external_id (str | SequenceType[str] | None) – External id of sequence.
id (int | SequenceType[int] | None) – Id of sequence.
limit (int | None) – Maximum number of rows to return per sequence. Pass None to fetch all (possibly limited by ‘end’).
- Returns
SequenceData if single identifier was given, else SequenceDataList
- Return type
Examples
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.sequences.data.retrieve(id=1, start=0, end=None) >>> tuples = [(r,v) for r,v in res.items()] # You can use this iterator in for loops and list comprehensions, >>> single_value = res[23] # ... get the values at a single row number, >>> col = res.get_column(external_id='columnExtId') # ... get the array of values for a specific column, >>> df = res.to_pandas() # ... or convert the result to a dataframe
Retrieve pandas dataframe
- SequencesDataAPI.retrieve_dataframe(start: int, end: int | None, column_external_ids: list[str] | None = None, external_id: str | None = None, column_names: str | None = None, id: int | None = None, limit: int | None = None) pandas.DataFrame
Retrieve data from a sequence as a pandas dataframe
- Parameters
start (int) – (inclusive) row number to start from.
end (int | None) – (exclusive) upper limit on the row number. Set to None or -1 to get all rows until end of sequence.
column_external_ids (list[str] | None) – List of external id for the columns of the sequence. If ‘None’ is passed, all columns will be retrieved.
external_id (str | None) – External id of sequence.
column_names (str | None) – Which field(s) to use as column header. Can use “externalId”, “id”, “columnExternalId”, “id|columnExternalId” or “externalId|columnExternalId”. Default is “externalId|columnExternalId” for queries on more than one sequence, and “columnExternalId” for queries on a single sequence.
id (int | None) – Id of sequence
limit (int | None) – Maximum number of rows to return per sequence.
- Returns
pandas.DataFrame
- Return type
pandas.DataFrame
Examples
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> df = c.sequences.data.retrieve_dataframe(id=1, start=0, end=None)
Insert rows into a sequence
- SequencesDataAPI.insert(rows: dict[int, SequenceType[int | float | str]] | SequenceType[tuple[int, SequenceType[int | float | str]]] | SequenceType[dict[str, Any]] | SequenceData, column_external_ids: SequenceType[str] | None, id: int | None = None, external_id: str | None = None) None
-
- Parameters
rows (dict[int, SequenceType[int | float | str]] | SequenceType[tuple[int, SequenceType[int | float | str]]] | SequenceType[dict[str, Any]] | SequenceData) – The rows you wish to insert. Can either be a list of tuples, a list of {“rowNumber”:… ,”values”: …} objects, a dictionary of rowNumber: data, or a SequenceData object. See examples below.
column_external_ids (SequenceType[str] | None) – List of external id for the columns of the sequence.
id (int | None) – Id of sequence to insert rows into.
external_id (str | None) – External id of sequence to insert rows into.
Examples
Your rows of data can be a list of tuples where the first element is the rownumber and the second element is the data to be inserted:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> seq = c.sequences.create(Sequence(columns=[{"valueType": "STRING", "externalId":"col_a"},{"valueType": "DOUBLE", "externalId":"col_b"}])) >>> data = [(1, ['pi',3.14]), (2, ['e',2.72]) ] >>> c.sequences.data.insert(column_external_ids=["col_a","col_b"], rows=data, id=1)
They can also be provided as a list of API-style objects with a rowNumber and values field:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> data = [{"rowNumber": 123, "values": ['str',3]}, {"rowNumber": 456, "values": ["bar",42]} ] >>> c.sequences.data.insert(data, id=1, column_external_ids=["col_a","col_b"]) # implicit columns are retrieved from metadata
Or they can be a given as a dictionary with row number as the key, and the value is the data to be inserted at that row:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> data = {123 : ['str',3], 456 : ['bar',42] } >>> c.sequences.data.insert(column_external_ids=['stringColumn','intColumn'], rows=data, id=1)
Finally, they can be a SequenceData object retrieved from another request. In this case column_external_ids from this object are used as well.
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> data = c.sequences.data.retrieve(id=2,start=0,end=10) >>> c.sequences.data.insert(rows=data, id=1,column_external_ids=None)
Insert a pandas dataframe into a sequence
- SequencesDataAPI.insert_dataframe(dataframe: pandas.DataFrame, id: int | None = None, external_id: str | None = None) None
-
The index of the dataframe must contain the row numbers. The names of the remaining columns specify the column external ids. The sequence and columns must already exist.
- Parameters
dataframe (pandas.DataFrame) – Pandas DataFrame object containing the sequence data.
id (int | None) – Id of sequence to insert rows into.
external_id (str | None) – External id of sequence to insert rows into.
Examples
Multiply data in the sequence by 2:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> df = c.sequences.data.retrieve_dataframe(id=123, start=0, end=None) >>> c.sequences.data.insert_dataframe(df*2, id=123)
Delete rows from a sequence
- SequencesDataAPI.delete(rows: SequenceType[int], id: int | None = None, external_id: str | None = None) None
-
- Parameters
rows (SequenceType[int]) – List of row numbers.
id (int | None) – Id of sequence to delete rows from.
external_id (str | None) – External id of sequence to delete rows from.
Examples
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.sequences.data.delete(id=1, rows=[1,2,42])
Delete a range of rows from a sequence
- SequencesDataAPI.delete_range(start: int, end: int | None, id: int | None = None, external_id: str | None = None) None
-
- Parameters
start (int) – Row number to start from (inclusive).
end (int | None) – Upper limit on the row number (exclusive). Set to None or -1 to delete all rows until end of sequence.
id (int | None) – Id of sequence to delete rows from.
external_id (str | None) – External id of sequence to delete rows from.
Examples
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.sequences.data.delete_range(id=1, start=0, end=None)
Sequence Data classes
- class cognite.client.data_classes.sequences.Sequence(id: int | None = None, name: str | None = None, description: str | None = None, asset_id: int | None = None, external_id: str | None = None, metadata: dict[str, Any] | None = None, columns: SequenceType[dict[str, Any]] | None = None, created_time: int | None = None, last_updated_time: int | None = None, data_set_id: int | None = None, cognite_client: CogniteClient | None = None)
Information about the sequence stored in the database
- Parameters
id (int | None) – Unique cognite-provided identifier for the sequence
name (str | None) – Name of the sequence
description (str | None) – Description of the sequence
asset_id (int | None) – Optional asset this sequence is associated with
external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.
metadata (dict[str, Any] | None) – Custom, application specific metadata. String key -> String value. Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.
columns (SequenceType[dict[str, Any]] | None) – List of column definitions
created_time (int | None) – Time when this sequence was created in CDF in milliseconds since Jan 1, 1970.
last_updated_time (int | None) – The last time this sequence was updated in CDF, in milliseconds since Jan 1, 1970.
data_set_id (int | None) – Data set that this sequence belongs to
cognite_client (CogniteClient | None) – The client to associate with this object.
- class cognite.client.data_classes.sequences.SequenceAggregate(count: int | None = None, **kwargs: Any)
Bases:
dict
No description.
- Parameters
count (int | None) – No description.
**kwargs (Any) – No description.
- class cognite.client.data_classes.sequences.SequenceColumnUpdate(id: int | None = None, external_id: str | None = None)
Bases:
CogniteUpdate
No description.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
- class cognite.client.data_classes.sequences.SequenceData(id: int | None = None, external_id: str | None = None, rows: SequenceType[dict] | None = None, row_numbers: SequenceType[int] | None = None, values: SequenceType[SequenceType[int | str | float]] | None = None, columns: SequenceType[dict[str, Any]] | None = None)
Bases:
CogniteResource
An object representing a list of rows from a sequence.
- Parameters
id (int | None) – Id of the sequence the data belong to
external_id (str | None) – External id of the sequence the data belong to
rows (SequenceType[dict] | None) – Combined row numbers and row data object from the API. If you pass this, row_numbers/values are ignored.
row_numbers (SequenceType[int] | None) – The data row numbers.
values (SequenceType[SequenceType[int | str | float]] | None) – The data values, one row at a time.
columns (SequenceType[dict[str, Any]] | None) – SequenceType[dict]: The column information, in the format returned by the API.
- property column_external_ids: list[str]
Retrieves list of column external ids for the sequence, for use in e.g. data retrieve or insert methods.
- Returns
List of sequence column external ids.
- Return type
list[str]
- property column_value_types: list[str]
Retrieves list of column value types.
- Returns
List of column value types
- Return type
list[str]
- dump(camel_case: bool = False) dict[str, Any]
Dump the sequence data into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representing the instance.
- Return type
dict[str, Any]
- get_column(external_id: str) list[int | str | float]
Get a column by external_id.
- Parameters
external_id (str) – External id of the column.
- Returns
A list of values for that column in the sequence
- Return type
list[int | str | float]
- items() Iterator[tuple[int, list[int | str | float]]]
Returns an iterator over tuples of (row number, values).
- to_pandas(column_names: str = 'columnExternalId') pandas.DataFrame
Convert the sequence data into a pandas DataFrame.
- Parameters
column_names (str) – Which field(s) to use as column header. Can use “externalId”, “id”, “columnExternalId”, “id|columnExternalId” or “externalId|columnExternalId”.
- Returns
The dataframe.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.sequences.SequenceDataList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[SequenceData
]- to_pandas(column_names: str = 'externalId|columnExternalId') pandas.DataFrame
Convert the sequence data list into a pandas DataFrame. Each column will be a sequence.
- Parameters
column_names (str) – Which field to use as column header. Can use any combination of “externalId”, “columnExternalId”, “id” and other characters as a template.
- Returns
The sequence data list as a pandas DataFrame.
- Return type
pandas.DataFrame
- class cognite.client.data_classes.sequences.SequenceFilter(name: str | None = None, external_id_prefix: str | None = None, metadata: dict[str, Any] | None = None, asset_ids: SequenceType[int] | None = None, asset_subtree_ids: SequenceType[dict[str, Any]] | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, data_set_ids: SequenceType[dict[str, Any]] | None = None)
Bases:
CogniteFilter
No description.
- Parameters
name (str | None) – Return only sequences with this exact name.
external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.
metadata (dict[str, Any] | None) – Filter the sequences by metadata fields and values (case-sensitive). Format is {“key1”:”value1”,”key2”:”value2”}.
asset_ids (SequenceType[int] | None) – Return only sequences linked to one of the specified assets.
asset_subtree_ids (SequenceType[dict[str, Any]] | None) – Only include sequences that have a related asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.
created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.
data_set_ids (SequenceType[dict[str, Any]] | None) – Only include sequences that belong to these datasets.
- class cognite.client.data_classes.sequences.SequenceList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[Sequence
],IdTransformerMixin
- class cognite.client.data_classes.sequences.SequenceProperty(value)
Bases:
EnumProperty
An enumeration.
- class cognite.client.data_classes.sequences.SequenceUpdate(id: int | None = None, external_id: str | None = None)
Bases:
CogniteUpdate
No description.
- Parameters
id (int) – A server-generated ID for the object.
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
- class cognite.client.data_classes.sequences.SortableSequenceProperty(value)
Bases:
EnumProperty
An enumeration.
Data Point Subscriptions
Warning
- DataPoint Subscription is a new feature:
The API specification is in beta.
The SDK implementation is in alpha.
Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.
Create data point subscriptions
- DatapointsSubscriptionAPI.create(subscription: DataPointSubscriptionCreate) DatapointSubscription
-
Create a subscription that can be used to listen for changes in data points for a set of time series.
- Parameters
subscription (DataPointSubscriptionCreate) – Subscription to create.
- Returns
Created subscription
- Return type
Examples
Create a subscription with explicit time series IDs:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import DataPointSubscriptionCreate >>> c = CogniteClient() >>> sub = DataPointSubscriptionCreate("mySubscription", partition_count=1, time_series_ids=["myFistTimeSeries", "mySecondTimeSeries"], name="My subscription") >>> created = c.time_series.subscriptions.create(sub)
Create a filter defined subscription for all numeric time series:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import DataPointSubscriptionCreate >>> from cognite.client.data_classes import filters >>> from cognite.client.data_classes.datapoints_subscriptions import DatapointSubscriptionFilterProperties >>> c = CogniteClient() >>> f = filters >>> p = DatapointSubscriptionFilterProperties >>> numeric_timeseries = f.Equals(p.is_string, False) >>> sub = DataPointSubscriptionCreate("mySubscription", partition_count=1, filter=numeric_timeseries, name="My subscription for Numeric time series") >>> created = c.time_series.subscriptions.create(sub)
Retrieve a data point subscription by id(s)
- DatapointsSubscriptionAPI.retrieve(external_id: str, ignore_unknown_ids: bool = False) DatapointSubscription | None
Retrieve one subscription by external ID.
- Parameters
external_id (str) – External ID of the subscription to retrieve.
ignore_unknown_ids (bool) – Whether to ignore IDs and external IDs that are not found rather than throw an exception.
- Returns
The requested subscription.
- Return type
DatapointSubscription | None
Examples
Retrieve a subscription by external ID:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> res = c.time_series.subscriptions.retrieve("my_subscription")
List data point subscriptions
- DatapointsSubscriptionAPI.list(limit: int | None = 25) DatapointSubscriptionList
-
- Parameters
limit (int | None) – Maximum number of subscriptions to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of requested datapoint subscriptions
- Return type
Examples
List 5 subscriptions:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> subscriptions = c.time_series.subscriptions.list(limit=5)
List member time series of subscription
- DatapointsSubscriptionAPI.list_member_time_series(external_id: str, limit: int | None = 25) TimeSeriesIDList
List time series in a subscription
Retrieve a list of time series (IDs) that the subscription is currently retrieving updates from
- Parameters
external_id (str) – External ID of the subscription to retrieve members of.
limit (int | None) – Maximum number of time series to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
List of time series in the subscription.
- Return type
Examples
List time series in a subscription:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import DataPointSubscriptionUpdate >>> c = CogniteClient() >>> members = c.time_series.subscriptions.list_member_time_series("my_subscription") >>> timeseries_external_ids = members.as_external_ids()
Iterate over subscriptions data
- DatapointsSubscriptionAPI.iterate_data(external_id: str, start: str | None = None, limit: int = 25, partition: int = 0, cursor: str | None = None) Iterator[DatapointSubscriptionBatch]
Iterate over data from a given subscription.
Data can be ingested datapoints and time ranges where data is deleted. This endpoint will also return changes to the subscription itself, that is, if time series are added or removed from the subscription.
Warning
This endpoint will store updates from when the subscription was created, but updates older than 7 days may be discarded.
- Parameters
external_id (str) – The external ID of the subscription.
start (str | None) – When to start the iteration. If set to None, the iteration will start from the beginning. The format is “N[timeunit]-ago”, where timeunit is w,d,h,m (week, day, hour, minute). For example, “12h-ago” will start the iteration from 12 hours ago. You can also set it to “now” to jump straight to the end. Defaults to None.
limit (int) – Approximate number of results to return across all partitions.
partition (int) – The partition to iterate over. Defaults to 0.
cursor (str | None) – Optional cursor to start iterating from.
- Yields
DatapointSubscriptionBatch – Changes to the subscription and data in the subscribed time series.
Examples
Iterate over changes to subscription timeseries since the beginning until there is no more data:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> for batch in c.time_series.subscriptions.iterate_data("my_subscription"): ... print(f"Added {len(batch.subscription_changes.added)} timeseries") ... print(f"Removed {len(batch.subscription_changes.removed)} timeseries") ... print(f"Changed timeseries data in {len(batch.updates)} updates") ... if not batch.has_next: ... break
Iterate continuously over all changes to the subscription newer than 3 days:
>>> import time >>> for batch in c.time_series.subscriptions.iterate_data("my_subscription", "3d-ago"): ... print(f"Added {len(batch.subscription_changes.added)} timeseries") ... print(f"Removed {len(batch.subscription_changes.removed)} timeseries") ... print(f"Changed timeseries data in {len(batch.updates)} updates") ... if not batch.has_next: ... time.sleep(1)
Update data point subscription
- DatapointsSubscriptionAPI.update(update: DataPointSubscriptionUpdate) DatapointSubscription
-
Update a subscription. Note that Fields that are not included in the request are not changed. Furthermore, the subscription partition cannot be changed.
- Parameters
update (DataPointSubscriptionUpdate) – The subscription update.
- Returns
Updated subscription.
- Return type
Examples
Change the name of a preexisting subscription:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import DataPointSubscriptionUpdate >>> c = CogniteClient() >>> update = DataPointSubscriptionUpdate("my_subscription").name.set("My New Name") >>> updated = c.time_series.subscriptions.update(update)
Add a time series to a preexisting subscription:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import DataPointSubscriptionUpdate >>> c = CogniteClient() >>> update = DataPointSubscriptionUpdate("my_subscription").time_series_ids.add(["MyNewTimeSeriesExternalId"]) >>> updated = c.time_series.subscriptions.update(update)
Delete data point subscription
- DatapointsSubscriptionAPI.delete(external_id: str | Sequence[str], ignore_unknown_ids: bool = False) None
Delete subscription(s). This operation cannot be undone.
- Parameters
external_id (str | Sequence[str]) – External ID or list of external IDs of subscriptions to delete.
ignore_unknown_ids (bool) – Whether to ignore IDs and external IDs that are not found rather than throw an exception.
Examples
Delete a subscription by external ID:
>>> from cognite.client import CogniteClient >>> c = CogniteClient() >>> c.time_series.subscriptions.delete("my_subscription")
Data Point Subscription classes
- class cognite.client.data_classes.datapoints_subscriptions.DataDeletion(inclusive_begin: 'int', exclusive_end: 'int | None')
Bases:
object
- class cognite.client.data_classes.datapoints_subscriptions.DataPointSubscriptionCreate(external_id: str, partition_count: int, time_series_ids: list[ExternalId] | None = None, filter: Filter | None = None, name: str | None = None, description: str | None = None)
Bases:
DatapointSubscriptionCore
- A data point subscription is a way to listen to changes to time series data points, in ingestion order.
This is the write version of a subscription, used to create new subscriptions.
A subscription can either be defined directly by a list of time series ids or indirectly by a filter.
- Parameters
external_id (str) – Externally provided ID for the subscription. Must be unique.
partition_count (int) – The maximum effective parallelism of this subscription (the number of clients that can read from it concurrently) will be limited to this number, but a higher partition count will cause a higher time overhead. The partition count must be between 1 and 100. CAVEAT: This cannot change after the subscription has been created.
time_series_ids (list[ExternalId] | None) – List of (external) ids of time series that this subscription will listen to. Not compatible with filter.
filter (Filter | None) – A filter DSL (Domain Specific Language) to define advanced filter queries. Not compatible with time_series_ids.
name (str | None) – No description.
description (str | None) – A summary explanation for the subscription.
- class cognite.client.data_classes.datapoints_subscriptions.DataPointSubscriptionUpdate(external_id: str)
Bases:
CogniteUpdate
Changes applied to datapoint subscription
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
- class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscription(external_id: ExternalId, partition_count: int, created_time: int, last_updated_time: int, time_series_count: int, filter: Filter | None = None, name: str | None = None, description: str | None = None, **_: Any)
Bases:
DatapointSubscriptionCore
- A data point subscription is a way to listen to changes to time series data points, in ingestion order.
This is the read version of a subscription, used when reading subscriptions from CDF.
- Parameters
external_id (ExternalId) – Externally provided ID for the subscription. Must be unique.
partition_count (int) – The maximum effective parallelism of this subscription (the number of clients that can read from it concurrently) will be limited to this number, but a higher partition count will cause a higher time overhead.
created_time (int) – Time when the subscription was created in CDF in milliseconds since Jan 1, 1970.
last_updated_time (int) – Time when the subscription was last updated in CDF in milliseconds since Jan 1, 1970.
time_series_count (int) – The number of time series in the subscription.
filter (Filter | None) – If present, the subscription is defined by this filter.
name (str | None) – No description.
description (str | None) – A summary explanation for the subscription.
**_ (Any) – No description.
- class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionBatch(updates: 'list[DatapointsUpdate]', subscription_changes: 'SubscriptionTimeSeriesUpdate', has_next: 'bool', cursor: 'str')
Bases:
object
- class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionFilterProperties(value)
Bases:
EnumProperty
An enumeration.
- class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
- class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionPartition(index: 'int', cursor: 'str | None' = None)
Bases:
object
- class cognite.client.data_classes.datapoints_subscriptions.DatapointsUpdate(time_series: 'TimeSeriesID', upserts: 'Datapoints', deletes: 'list[DataDeletion]')
Bases:
object
- class cognite.client.data_classes.datapoints_subscriptions.SubscriptionTimeSeriesUpdate(added: 'list[TimeSeriesID]', removed: 'list[TimeSeriesID]')
Bases:
object
- class cognite.client.data_classes.datapoints_subscriptions.TimeSeriesID(id: int, external_id: ExternalId | None = None)
Bases:
CogniteResource
A TimeSeries Identifier to uniquely identify a time series.
- Parameters
id (int) – A server-generated ID for the object.
external_id (ExternalId | None) – The external ID provided by the client. Must be unique for the resource type.
- dump(camel_case: bool = False) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to False.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.datapoints_subscriptions.TimeSeriesIDList(resources: Collection[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[TimeSeriesID
],IdTransformerMixin