Core Data Model

Assets

Retrieve an asset by id

AssetsAPI.retrieve(id: int | None = None, external_id: str | None = None) Asset | None

Retrieve a single asset by id.

Parameters
  • id (int | None) – ID

  • external_id (str | None) – External ID

Returns

Requested asset or None if it does not exist.

Return type

Asset | None

Examples

Get asset by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.retrieve(id=1)

Get asset by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.retrieve(external_id="1")

Retrieve multiple assets by id

AssetsAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) AssetList

Retrieve multiple assets by id.

Parameters
  • ids (Sequence[int] | None) – IDs

  • external_ids (Sequence[str] | None) – External IDs

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Returns

The requested assets.

Return type

AssetList

Examples

Get assets by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.retrieve_multiple(ids=[1, 2, 3])

Get assets by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.retrieve_multiple(external_ids=["abc", "def"], ignore_unknown_ids=True)

Retrieve an asset subtree

AssetsAPI.retrieve_subtree(id: int | None = None, external_id: str | None = None, depth: int | None = None) AssetList

Retrieve the subtree for this asset up to a specified depth.

Parameters
  • id (int | None) – Id of the root asset in the subtree.

  • external_id (str | None) – External id of the root asset in the subtree.

  • depth (int | None) – Retrieve assets up to this depth below the root asset in the subtree. Omit to get the entire subtree.

Returns

The requested assets or empty AssetList if asset does not exist.

Return type

AssetList

List assets

AssetsAPI.list(name: str | None = None, parent_ids: Sequence[int] | None = None, parent_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None, metadata: dict[str, str] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, root: bool | None = None, external_id_prefix: str | None = None, aggregated_properties: Sequence[str] | None = None, partitions: int | None = None, limit: int | None = 25) AssetList

List assets

Parameters
  • name (str | None) – Name of asset. Often referred to as tag.

  • parent_ids (Sequence[int] | None) – Return only the direct descendants of the specified assets.

  • parent_external_ids (Sequence[str] | None) – Return only the direct descendants of the specified assets.

  • asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.

  • asset_subtree_external_ids (str | Sequence[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.

  • data_set_ids (int | Sequence[int] | None) – Return only assets in the specified data set(s) with this id / these ids.

  • data_set_external_ids (str | Sequence[str] | None) – Return only assets in the specified data set(s) with this external id / these external ids.

  • labels (LabelFilter | None) – Return only the assets matching the specified label filter.

  • geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value.

  • source (str | None) – The source of this asset.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • root (bool | None) – filtered assets are root assets or not.

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

  • aggregated_properties (Sequence[str] | None) – Set of aggregated properties to include.

  • partitions (int | None) – Retrieve assets in parallel using this number of workers. Also requires limit=None to be passed. To prevent unexpected problems and maximize read throughput, API documentation recommends at most use 10 partitions. When using more than 10 partitions, actual throughout decreases. In future releases of the APIs, CDF may reject requests with more than 10 partitions.

  • limit (int | None) – Maximum number of assets to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of requested assets

Return type

AssetList

Examples

List assets:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> asset_list = c.assets.list(limit=5)

Iterate over assets:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for asset in c.assets:
...     asset # do something with the asset

Iterate over chunks of assets to reduce memory load:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for asset_list in c.assets(chunk_size=2500):
...     asset_list # do something with the assets

Filter assets based on labels:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import LabelFilter
>>> c = CogniteClient()
>>> my_label_filter = LabelFilter(contains_all=["PUMP", "VERIFIED"])
>>> asset_list = c.assets.list(labels=my_label_filter)

Aggregate assets

AssetsAPI.aggregate(filter: AssetFilter | dict | None = None) list[AssetAggregate]

Aggregate assets

Parameters

filter (AssetFilter | dict | None) – Filter on assets with strict matching.

Returns

List of asset aggregates

Return type

list[AssetAggregate]

Examples

Aggregate assets:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> aggregate_by_prefix = c.assets.aggregate(filter={"external_id_prefix": "prefix"})

Aggregate asset metadata keys

AssetsAPI.aggregate_metadata_keys(filter: AssetFilter | dict | None = None) Sequence[AggregateBucketResult]

Aggregate assets

Note

In the case of text fields, the values are aggregated in a case-insensitive manner

Parameters

filter (AssetFilter | dict | None) – Filter on assets with strict matching.

Returns

List of asset aggregates

Return type

Sequence[AggregateBucketResult]

Examples

Aggregate assets:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> aggregate_by_prefix = c.assets.aggregate_metadata_keys(filter={"external_id_prefix": "prefix"})

Aggregate asset metadata values

AssetsAPI.aggregate_metadata_values(keys: Sequence[str], filter: AssetFilter | dict | None = None) Sequence[AggregateBucketResult]

Aggregate assets

Note

In the case of text fields, the values are aggregated in a case-insensitive manner

Parameters
  • keys (Sequence[str]) – Metadata key(s) to apply the aggregation on. Currently supports exactly one key per request.

  • filter (AssetFilter | dict | None) – Filter on assets with strict matching.

Returns

List of asset aggregates

Return type

Sequence[AggregateBucketResult]

Examples

Aggregate assets:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> aggregate_by_prefix = c.assets.aggregate_metadata_values(
...     keys=["someKey"],
...     filter={"external_id_prefix": "prefix"}
... )

Aggregate Asset Count

AssetsAPI.aggregate_count(property: AssetPropertyLike | None = None, advanced_filter: Filter | dict | None = None, filter: AssetFilter | dict | None = None) int

Count of assets matching the specified filters.

Parameters
  • property (AssetPropertyLike | None) – If specified, get an approximate number of asset with a specific property (property is not null) and matching the filters.

  • advanced_filter (Filter | dict | None) – The advanced filter to narrow down the assets to count.

  • filter (AssetFilter | dict | None) – The filter to narrow down the assets to count (strict matching).

Returns

The number of assets matching the specified filters.

Return type

int

Examples:

Count the number of assets in your CDF project:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> count = c.assets.aggregate_count()

Count the number of assets with the metadata key “timezone” in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.assets import AssetProperty
>>> c = CogniteClient()
>>> has_timezone = filters.ContainsAny(AssetProperty.metadata, "timezone")
>>> asset_count = c.assets.aggregate_count(advanced_filter=has_timezone)

Aggregate Asset Value Cardinality

AssetsAPI.aggregate_cardinality_values(property: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) int

Find approximate property count for assets.

Parameters
  • property (AssetPropertyLike) – The property to count the cardinality of.

  • advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).

Returns

The number of properties matching the specified filters and search.

Return type

int

Examples

Count the number of labels used by assets in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.assets import AssetProperty
>>> c = CogniteClient()
>>> label_count = c.assets.aggregate_cardinality_values(AssetProperty.labels)

Count the number of timezones (metadata key) for assets with the word “critical” in the description in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.assets import AssetProperty
>>> c = CogniteClient()
>>> is_critical = filters.Search(AssetProperty.description, "critical")
>>> critical_assets = c.assets.aggregate_cardinality_values(AssetProperty.metadata_key("timezone"), advanced_filter=is_critical)

Aggregate Asset Property Cardinality

AssetsAPI.aggregate_cardinality_properties(path: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) int

Find approximate paths count for assets.

Parameters
  • path (AssetPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).

Returns

The number of properties matching the specified filters.

Return type

int

Examples

Count the number of unique metadata keys used by assets in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.assets import AssetProperty
>>> c = CogniteClient()
>>> key_count = c.assets.aggregate_cardinality_properties(AssetProperty.metadata)

Aggregate Asset Unique Values

AssetsAPI.aggregate_unique_values(property: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) UniqueResultList

Get unique properties with counts for assets.

Parameters
  • property (AssetPropertyLike) – The property to group by.

  • advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).

Returns

List of unique values of assets matching the specified filters and search.

Return type

UniqueResultList

Examples:

Get the timezones (metadata key) with count for your assets in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.assets import AssetProperty
>>> c = CogniteClient()
>>> result = c.assets.aggregate_unique_values(AssetProperty.metadata_key("timezone"))
>>> print(result.unique)

Get the different labels with count used for assets created after 2020-01-01 in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.assets import AssetProperty
>>> from cognite.client.utils import timestamp_to_ms
>>> from datetime import datetime
>>> c = CogniteClient()
>>> created_after_2020 = filters.Range(AssetProperty.created_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.assets.aggregate_unique_values(AssetProperty.labels, advanced_filter=created_after_2020)
>>> print(result.unique)

Get the different labels with count for assets updated after 2020-01-01 in your CDF project, but exclude all labels that starts with “test”:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.assets import AssetProperty
>>> from cognite.client.data_classes import aggregations as aggs, filters
>>> c = CogniteClient()
>>> not_test = aggs.Not(aggs.Prefix("test"))
>>> created_after_2020 = filters.Range(AssetProperty.last_updated_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.assets.aggregate_unique_values(AssetProperty.labels, advanced_filter=created_after_2020, aggregate_filter=not_test)
>>> print(result.unique)

Aggregate Asset Unique Properties

AssetsAPI.aggregate_unique_properties(path: AssetPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: AssetFilter | dict | None = None) UniqueResultList

Get unique paths with counts for assets.

Parameters
  • path (AssetPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The advanced filter to narrow down assets.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (AssetFilter | dict | None) – The filter to narrow down assets (strict matching).

Returns

List of unique values of assets matching the specified filters and search.

Return type

UniqueResultList

Examples

Get the metadata keys with counts for your assets in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.assets import AssetProperty
>>> c = CogniteClient()
>>> result = c.assets.aggregate_unique_properties(AssetProperty.metadata)

Search for assets

AssetsAPI.search(name: str | None = None, description: str | None = None, query: str | None = None, filter: AssetFilter | dict | None = None, limit: int = 25) AssetList

Search for assets Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.

Parameters
  • name (str | None) – Fuzzy match on name.

  • description (str | None) – Fuzzy match on description.

  • query (str | None) – Whitespace-separated terms to search for in assets. Does a best-effort fuzzy search in relevant fields (currently name and description) for variations of any of the search terms, and orders results by relevance.

  • filter (AssetFilter | dict | None) – Filter to apply. Performs exact match on these fields.

  • limit (int) – Maximum number of results to return.

Returns

List of requested assets

Return type

AssetList

Examples

Search for assets by fuzzy search on name:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.search(name="some name")

Search for assets by exact search on name:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.search(filter={"name": "some name"})

Search for assets by improved multi-field fuzzy search:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.search(query="TAG 30 XV")

Search for assets using multiple filters, finding all assets with name similar to xyz with parent asset 123 or 456 with source some source:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets.search(name="xyz",filter={"parent_ids": [123,456],"source": "some source"})

Search for an asset with an attached label:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_label_filter = LabelFilter(contains_all=["PUMP"])
>>> res = c.assets.search(name="xyz",filter=AssetFilter(labels=my_label_filter))

Create assets

AssetsAPI.create(asset: Sequence[Asset]) AssetList
AssetsAPI.create(asset: Asset) Asset

Create one or more assets.

You can create an arbitrary number of assets, and the SDK will split the request into multiple requests. When specifying parent-child relation between assets using parentExternalId the link will be resvoled into an internal ID and stored as parentId.

Parameters

asset (Asset | Sequence[Asset]) – Asset or list of assets to create.

Returns

Created asset(s)

Return type

Asset | AssetList

Examples

Create new assets:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Asset
>>> c = CogniteClient()
>>> assets = [Asset(name="asset1"), Asset(name="asset2")]
>>> res = c.assets.create(assets)

Create asset with label:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Asset, Label
>>> c = CogniteClient()
>>> asset = Asset(name="my_pump", labels=[Label(external_id="PUMP")])
>>> res = c.assets.create(asset)

Create asset hierarchy

AssetsAPI.create_hierarchy(assets: Sequence[Asset] | AssetHierarchy, *, upsert: bool = False, upsert_mode: Literal['patch', 'replace'] = 'patch') AssetList

Create an asset hierarchy with validation.

This helper function makes it easy to insert large asset hierarchies. It solves the problem of topological insertion order, i.e. a parent asset must exist before it can be referenced by any ‘children’ assets. You may pass any number of partial- or full hierarchies: there are no requirements on the number of root assets, so you may pass zero, one or many (same goes for the non-root assets).

Parameters
  • assets (Sequence[Asset] | AssetHierarchy) – List of assets to create or an instance of AssetHierarchy.

  • upsert (bool) – If used, already existing assets will be updated instead of an exception being raised. You may control how updates are applied with the ‘upsert_mode’ argument.

  • upsert_mode (Literal["patch", "replace"]) – Only applicable with upsert. Pass ‘patch’ to only update fields with non-null values (default), or ‘replace’ to do full updates (unset fields become null or empty).

Returns

Created (and possibly updated) asset hierarchy

Return type

AssetList

Prior to insertion, this function will run validation on the given assets and raise an error if any of the following issues are found:

  1. Any assets are invalid (category: invalid):

    • Missing external ID.

    • Missing a valid name.

    • Has an ID set.

  2. Any asset duplicates exist (category: duplicates)

  3. Any assets have an ambiguous parent link (category: unsure_parents)

  4. Any group of assets form a cycle, e.g. A->B->A (category: cycles)

As part of validation there is a fifth category that is ignored when using this method (for backwards compatibility) and that is orphan assets. These are assets linking a parent by an identifier that is not present among the given assets, and as such, might contain links we are unable to vet ahead of insertion. These are thus assumed to be valid, but may fail.

Tip

The different categories specified above corresponds to the name of the attribute you might access on the raised error to get the collection of ‘bad’ assets falling in that group, e.g. error.duplicates.

Note

Updating external_id via upsert is not supported (and will not be supported). Use AssetsAPI.update instead.

Warning

The API does not natively support upsert, so the SDK has to simulate the behaviour at the cost of some insertion speed.

Be careful when moving assets to new parents via upsert: Please do so only by specifying parent_external_id (instead of parent_id) to avoid race conditions in insertion order (temporary cycles might form since we can only make changes to 1000 assets at the time).

Examples

Create an asset hierarchy:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Asset
>>> c = CogniteClient()
>>> assets = [
...     Asset(external_id="root", name="root"),
...     Asset(external_id="child1", parent_external_id="root", name="child1"),
...     Asset(external_id="child2", parent_external_id="root", name="child2")]
>>> res = c.assets.create_hierarchy(assets)

Create an asset hierarchy, but run update for existing assets:

>>> res = c.assets.create_hierarchy(assets, upsert=True, upsert_mode="patch")

Patch will only update the parameters you have defined on your assets. Note that specifically setting something to None is the same as not setting it. For metadata, this will extend your existing data, only overwriting when keys overlap. For labels the behaviour is mostly the same, existing are left untouched, and your new ones are simply added.

You may also pass upsert_mode="replace" to make sure the updated assets look identical to the ones you passed to the method. For both metadata and labels this will clear out all existing, before (potentially) adding the new ones.

If the hierarchy validation for some reason fail, you may inspect all the issues that were found by catching CogniteAssetHierarchyError:

>>> from cognite.client.exceptions import CogniteAssetHierarchyError
>>> try:
...     res = c.assets.create_hierarchy(assets)
... except CogniteAssetHierarchyError as err:
...     if err.invalid:
...         ...  # do something

In addition to invalid, you may inspect duplicates, unsure_parents, orphans and cycles. Note that cycles are not available if any of the other basic issues exist, as the search for cyclical references requires a clean asset hierarchy to begin with.

You may also wrap the create_hierarchy() call in a try-except to get information if any of the assets fails to be created (assuming a valid hierarchy):

>>> from cognite.client.exceptions import CogniteAPIError
>>> try:
...     c.assets.create_hierarchy(assets)
... except CogniteAPIError as err:
...     created = err.successful
...     maybe_created = err.unknown
...     not_created = err.failed

Here’s a slightly longer explanation of the different groups:

  • err.successful: Which assets were created (request yielded a 201)

  • err.unknown: Which assets may have been created (request yielded 5xx)

  • err.failed: Which assets were not created (request yielded 4xx, or was a descendant of an asset with unknown status)

The preferred way to create an asset hierarchy, is to run validation prior to insertion. You may do this by using the AssetHierarchy class. It will by default consider orphan assets to be problematic (but accepts the boolean parameter ignore_orphans), contrary to how create_hierarchy works (which accepts them in order to be backwards-compatible). It also provides helpful methods to create reports of any issues found, check out validate_and_report:

>>> from cognite.client.data_classes import AssetHierarchy
>>> from pathlib import Path
>>> hierarchy = AssetHierarchy(assets)
>>> if hierarchy.is_valid():
...     res = c.assets.create_hierarchy(hierarchy)
... else:
...     hierarchy.validate_and_report(output_file=Path("report.txt"))

Delete assets

AssetsAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, recursive: bool = False, ignore_unknown_ids: bool = False) None

Delete one or more assets

Parameters
  • id (int | Sequence[int] | None) – Id or list of ids

  • external_id (str | Sequence[str] | None) – External ID or list of external ids

  • recursive (bool) – Recursively delete whole asset subtrees under given ids. Defaults to False.

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Examples

Delete assets by id or external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.assets.delete(id=[1,2,3], external_id="3")

Filter assets

AssetsAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, aggregated_properties: Sequence[Literal['child_count', 'path', 'depth']] | None = None, limit: int | None = 25) AssetList

Advanced filter assets

Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.

Parameters
  • filter (Filter | dict) – Filter to apply.

  • sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.

  • aggregated_properties (Sequence[Literal["child_count", "path", "depth"]] | None) – Set of aggregated properties to include. Options are childCount, path, depth.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of assets that match the filter criteria.

Return type

AssetList

Examples

Find all assets that have a metadata key ‘timezone’ starting with ‘Europe’, and sort by external id ascending:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> c = CogniteClient()
>>> f = filters
>>> in_timezone = f.Prefix(["metadata", "timezone"], "Europe")
>>> res = c.assets.filter(filter=in_timezone,
...                       sort=("external_id", "asc"))

Note that you can check the API documentation above to see which properties you can filter on with which filters.

To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the AssetProperty and SortableAssetProperty Enums.

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.assets import AssetProperty, SortableAssetProperty
>>> c = CogniteClient()
>>> f = filters
>>> in_timezone = f.Prefix(AssetProperty.metadata_key("timezone"), "Europe")
>>> res = c.assets.filter(filter=in_timezone,
...                       sort=(SortableAssetProperty.external_id, "asc"))

Update assets

AssetsAPI.update(item: Sequence[Asset | AssetUpdate]) AssetList
AssetsAPI.update(item: Asset | AssetUpdate) Asset

Update one or more assets Labels can be added, removed or replaced (set). Note that set operation deletes all the existing labels and adds the new specified labels.

Parameters

item (Asset | AssetUpdate | Sequence[Asset | AssetUpdate]) – Asset(s) to update

Returns

Updated asset(s)

Return type

Asset | AssetList

Examples

Perform a partial update on an asset, updating the description and adding a new field to metadata:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import AssetUpdate
>>> c = CogniteClient()
>>> my_update = AssetUpdate(id=1).description.set("New description").metadata.add({"key": "value"})
>>> res1 = c.assets.update(my_update)
>>> # Remove an already set field like so
>>> another_update = AssetUpdate(id=1).description.set(None)
>>> res2 = c.assets.update(another_update)

Remove the metadata on an asset:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import AssetUpdate
>>> c = CogniteClient()
>>> my_update = AssetUpdate(id=1).metadata.add({"key": "value"})
>>> res1 = c.assets.update(my_update)
>>> another_update = AssetUpdate(id=1).metadata.set(None)
>>> # The same result can be achieved with:
>>> another_update2 = AssetUpdate(id=1).metadata.set({})
>>> res2 = c.assets.update(another_update)

Attach labels to an asset:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import AssetUpdate
>>> c = CogniteClient()
>>> my_update = AssetUpdate(id=1).labels.add(["PUMP", "VERIFIED"])
>>> res = c.assets.update(my_update)

Detach a single label from an asset:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import AssetUpdate
>>> c = CogniteClient()
>>> my_update = AssetUpdate(id=1).labels.remove("PUMP")
>>> res = c.assets.update(my_update)

Replace all labels for an asset:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import AssetUpdate
>>> c = CogniteClient()
>>> my_update = AssetUpdate(id=1).labels.set("PUMP")
>>> res = c.assets.update(my_update)

Upsert assets

AssetsAPI.upsert(item: Sequence[Asset], mode: Literal['patch', 'replace'] = 'patch') AssetList
AssetsAPI.upsert(item: Asset, mode: Literal['patch', 'replace'] = 'patch') Asset
Upsert assets, i.e., update if it exists, and create if it does not exist.

Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.

For more details, see Upsert.

Parameters
  • item (Asset | Sequence[Asset]) – Asset or list of assets to upsert.

  • mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the assets are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified.

Returns

The upserted asset(s).

Return type

Asset | AssetList

Examples

Upsert for assets:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Asset
>>> c = CogniteClient()
>>> existing_asset = c.assets.retrieve(id=1)
>>> existing_asset.description = "New description"
>>> new_asset = Asset(external_id="new_asset", description="New asset")
>>> res = c.assets.upsert([existing_asset, new_asset], mode="replace")

Asset Data classes

class cognite.client.data_classes.assets.AggregateResultItem(child_count: int | None = None, depth: int | None = None, path: list[dict[str, Any]] | None = None, **kwargs: Any)

Bases: dict

Aggregated metrics of the asset

Parameters
  • child_count (int | None) – Number of direct descendants for the asset

  • depth (int | None) – Asset path depth (number of levels below root node).

  • path (list[dict[str, Any]] | None) – IDs of assets on the path to the asset.

  • **kwargs (Any) – No description.

class cognite.client.data_classes.assets.Asset(external_id: str | None = None, name: str | None = None, parent_id: int | None = None, parent_external_id: str | None = None, description: str | None = None, data_set_id: int | None = None, metadata: dict[str, str] | None = None, source: str | None = None, labels: list[Label | str | LabelDefinition | dict] | None = None, geo_location: GeoLocation | None = None, id: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, root_id: int | None = None, aggregates: dict[str, Any] | AggregateResultItem | None = None, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

A representation of a physical asset, for example a factory or a piece of equipment.

Parameters
  • external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.

  • name (str | None) – The name of the asset.

  • parent_id (int | None) – The parent of the node, null if it is the root node.

  • parent_external_id (str | None) – The external ID of the parent. The property is omitted if the asset doesn’t have a parent or if the parent doesn’t have externalId.

  • description (str | None) – The description of the asset.

  • data_set_id (int | None) – The id of the dataset this asset belongs to.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.

  • source (str | None) – The source of the asset.

  • labels (list[Label | str | LabelDefinition | dict] | None) – A list of the labels associated with this resource item.

  • geo_location (GeoLocation | None) – The geographic metadata of the asset.

  • id (int | None) – A server-generated ID for the object.

  • created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • root_id (int | None) – ID of the root asset.

  • aggregates (dict[str, Any] | AggregateResultItem | None) – Aggregated metrics of the asset

  • cognite_client (CogniteClient | None) – The client to associate with this object.

children() AssetList

Returns the children of this asset.

Returns

The requested assets

Return type

AssetList

dump(camel_case: bool = False) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

events(**kwargs: Any) EventList

Retrieve all events related to this asset.

Parameters

**kwargs (Any) – All extra keyword arguments are passed to events/list. NB: ‘asset_ids’ can’t be used.

Returns

All events related to this asset.

Return type

EventList

files(**kwargs: Any) FileMetadataList

Retrieve all files metadata related to this asset.

Parameters

**kwargs (Any) – All extra keyword arguments are passed to files/list. NB: ‘asset_ids’ can’t be used.

Returns

Metadata about all files related to this asset.

Return type

FileMetadataList

parent() Asset

Returns this asset’s parent.

Returns

The parent asset.

Return type

Asset

sequences(**kwargs: Any) SequenceList

Retrieve all sequences related to this asset.

Parameters

**kwargs (Any) – All extra keyword arguments are passed to sequences/list. NB: ‘asset_ids’ can’t be used.

Returns

All sequences related to this asset.

Return type

SequenceList

subtree(depth: int | None = None) AssetList

Returns the subtree of this asset up to a specified depth.

Parameters

depth (int | None) – Retrieve assets up to this depth below the asset.

Returns

The requested assets sorted topologically.

Return type

AssetList

time_series(**kwargs: Any) TimeSeriesList

Retrieve all time series related to this asset.

Parameters

**kwargs (Any) – All extra keyword arguments are passed to time_series/list. NB: ‘asset_ids’ can’t be used.

Returns

All time series related to this asset.

Return type

TimeSeriesList

to_pandas(expand: Sequence[str] = ('metadata', 'aggregates'), ignore: list[str] | None = None, camel_case: bool = False) pandas.DataFrame

Convert the instance into a pandas DataFrame.

Parameters
  • expand (Sequence[str]) – List of row keys to expand, only works if the value is a Dict.

  • ignore (list[str] | None) – List of row keys to not include when converting to a data frame.

  • camel_case (bool) – Convert column names to camel case (e.g. externalId instead of external_id)

Returns

The dataframe.

Return type

pandas.DataFrame

class cognite.client.data_classes.assets.AssetAggregate(count: int | None = None, **kwargs: Any)

Bases: dict

Aggregation group of assets

Parameters
  • count (int | None) – Size of the aggregation group

  • **kwargs (Any) – No description.

class cognite.client.data_classes.assets.AssetFilter(name: str | None = None, parent_ids: Sequence[int] | None = None, parent_external_ids: Sequence[str] | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, metadata: dict[str, str] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, root: bool | None = None, external_id_prefix: str | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None)

Bases: CogniteFilter

Filter on assets with strict matching.

Parameters
  • name (str | None) – The name of the asset.

  • parent_ids (Sequence[int] | None) – Return only the direct descendants of the specified assets.

  • parent_external_ids (Sequence[str] | None) – Return only the direct descendants of the specified assets.

  • asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include assets in subtrees rooted at the specified assets (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.

  • data_set_ids (Sequence[dict[str, Any]] | None) – No description.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.

  • source (str | None) – The source of the asset.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • root (bool | None) – Whether the filtered assets are root assets, or not. Set to True to only list root assets.

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

  • labels (LabelFilter | None) – Return only the resource matching the specified label constraints.

  • geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.

dump(camel_case: bool = False) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.assets.AssetHierarchy(assets: Sequence[Asset], ignore_orphans: bool = False)

Bases: object

Class that verifies if a collection of Assets is valid, by validating its internal consistency. This is done “offline”, meaning CDF is -not- queried for the already existing assets. As a result, any asset providing a parent link by ID instead of external ID, are assumed valid.

Parameters
  • assets (Sequence[Asset]) – Sequence of assets to be inspected for validity.

  • ignore_orphans (bool) – If true, orphan assets are assumed valid and won’t raise.

Examples

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import AssetHierarchy
>>> client = CogniteClient()
>>> hierarchy = AssetHierarchy(assets)
>>> # Get a report written to the terminal listing any issues:
>>> hierarchy.validate_and_report()
>>> if hierarchy.is_valid():
...     res = client.assets.create_hierarchy(hierarchy)
... # If there are issues, you may inspect them directly:
... else:
...     hierarchy.orphans
...     hierarchy.invalid
...     hierarchy.unsure_parents
...     hierarchy.duplicates
...     hierarchy.cycles  # Requires no other basic issues

There are other ways to generate the report than to write directly to screen. You may pass an output_file which can be either a Path object (writes are done in append-mode) or a file-like object supporting write (default is None which is just regular print):

>>> # Get a report written to file:
>>> from pathlib import Path
>>> report = Path("path/to/my_report.txt")
>>> hierarchy = AssetHierarchy(assets)
>>> hierarchy.validate_and_report(output_file=report)
>>> # Get a report as text "in memory":
>>> import io
>>> with io.StringIO() as file_like:
...     hierarchy.validate_and_report(output_file=file_like)
...     report = file_like.getvalue()
count_subtree(mapping: dict[str | None, list[Asset]]) dict[str, int]

Returns a mapping from asset external ID to the size of its subtree (children and children of children etc.).

Parameters

mapping (dict[str | None, list[Asset]]) – The mapping returned by groupby_parent_xid(). If None is passed, will be recreated (slightly expensive).

Returns

Lookup from external ID to descendant count.

Return type

dict[str, int]

groupby_parent_xid() dict[str | None, list[Asset]]

Returns a mapping from parent external ID to a list of its direct children.

Note

If the AssetHierarchy was initialized with ignore_orphans=True, all orphans assets, if any, are returned as part of the root assets in the mapping and can be accessed by mapping[None]. The same is true for all assets linking its parent by ID.

Returns

No description.

Return type

dict[str | None, list[Asset]]

class cognite.client.data_classes.assets.AssetList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[Asset], IdTransformerMixin

events() EventList

Retrieve all events related to these assets.

Returns

All events related to the assets in this AssetList.

Return type

EventList

files() FileMetadataList

Retrieve all files metadata related to these assets.

Returns

Metadata about all files related to the assets in this AssetList.

Return type

FileMetadataList

sequences() SequenceList

Retrieve all sequences related to these assets.

Returns

All sequences related to the assets in this AssetList.

Return type

SequenceList

time_series() TimeSeriesList

Retrieve all time series related to these assets.

Returns

All time series related to the assets in this AssetList.

Return type

TimeSeriesList

class cognite.client.data_classes.assets.AssetProperty(value)

Bases: EnumProperty

An enumeration.

class cognite.client.data_classes.assets.AssetUpdate(id: int | None = None, external_id: str | None = None)

Bases: CogniteUpdate

Changes applied to asset

Parameters
  • id (int) – A server-generated ID for the object.

  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

class cognite.client.data_classes.assets.SortableAssetProperty(value)

Bases: EnumProperty

An enumeration.

Events

Retrieve an event by id

EventsAPI.retrieve(id: int | None = None, external_id: str | None = None) Event | None

Retrieve a single event by id.

Parameters
  • id (int | None) – ID

  • external_id (str | None) – External ID

Returns

Requested event or None if it does not exist.

Return type

Event | None

Examples

Get event by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.events.retrieve(id=1)

Get event by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.events.retrieve(external_id="1")

Retrieve multiple events by id

EventsAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) EventList

Retrieve multiple events by id.

Parameters
  • ids (Sequence[int] | None) – IDs

  • external_ids (Sequence[str] | None) – External IDs

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Returns

The requested events.

Return type

EventList

Examples

Get events by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.events.retrieve_multiple(ids=[1, 2, 3])

Get events by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.events.retrieve_multiple(external_ids=["abc", "def"])

List events

EventsAPI.list(start_time: dict[str, Any] | TimestampRange | None = None, end_time: dict[str, Any] | EndTimeFilter | None = None, active_at_time: dict[str, Any] | TimestampRange | None = None, type: str | None = None, subtype: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, external_id_prefix: str | None = None, sort: Sequence[str] | None = None, partitions: int | None = None, limit: int | None = 25) EventList

List events

Parameters
  • start_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • end_time (dict[str, Any] | EndTimeFilter | None) – Range between two timestamps.

  • active_at_time (dict[str, Any] | TimestampRange | None) – Event is considered active from its startTime to endTime inclusive. If startTime is null, event is never active. If endTime is null, event is active from startTime onwards. activeAtTime filter will match all events that are active at some point from min to max, from min, or to max, depending on which of min and max parameters are specified.

  • type (str | None) – Type of the event, e.g ‘failure’.

  • subtype (str | None) – Subtype of the event, e.g ‘electrical’.

  • metadata (dict[str, str] | None) – Customizable extra data about the event. String key -> String value.

  • asset_ids (Sequence[int] | None) – Asset IDs of related equipments that this event relates to.

  • asset_external_ids (Sequence[str] | None) – Asset External IDs of related equipment that this event relates to.

  • asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.

  • asset_subtree_external_ids (str | Sequence[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.

  • data_set_ids (int | Sequence[int] | None) – Return only events in the specified data set(s) with this id / these ids.

  • data_set_external_ids (str | Sequence[str] | None) – Return only events in the specified data set(s) with this external id / these external ids.

  • source (str | None) – The source of this event.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • external_id_prefix (str | None) – External Id provided by client. Should be unique within the project.

  • sort (Sequence[str] | None) – Sort by array of selected fields. Ex: [“startTime:desc’]. Default sort order is asc when omitted. Filter accepts following field names: startTime, endTime, createdTime, lastUpdatedTime. We only support 1 field for now.

  • partitions (int | None) – Retrieve events in parallel using this number of workers. Also requires limit=None to be passed. To prevent unexpected problems and maximize read throughput, API documentation recommends at most use 10 partitions. When using more than 10 partitions, actual throughout decreases. In future releases of the APIs, CDF may reject requests with more than 10 partitions.

  • limit (int | None) – Maximum number of events to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of requested events

Return type

EventList

Examples

List events and filter on max start time:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> event_list = c.events.list(limit=5, start_time={"max": 1500000000})

Iterate over events:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for event in c.events:
...     event # do something with the event

Iterate over chunks of events to reduce memory load:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for event_list in c.events(chunk_size=2500):
...     event_list # do something with the events

Aggregate events

EventsAPI.aggregate(filter: EventFilter | dict | None = None) list[AggregateResult]

Aggregate events

Parameters

filter (EventFilter | dict | None) – Filter on events filter with exact match

Returns

List of event aggregates

Return type

list[AggregateResult]

Examples

Aggregate events:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> aggregate_type = c.events.aggregate(filter={"type": "failure"})

Aggregate Event Count

EventsAPI.aggregate_count(property: EventPropertyLike | None = None, advanced_filter: Filter | dict | None = None, filter: EventFilter | dict | None = None) int

Count of event matching the specified filters.

Parameters
  • property (EventPropertyLike | None) – If specified, Get an approximate number of Events with a specific property (property is not null) and matching the filters.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the events to count.

  • filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.

Returns

The number of events matching the specified filters and search.

Return type

int

Examples

Count the number of events in your CDF project:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> count = c.events.aggregate_count()

Count the number of workorder events in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.events import EventProperty
>>> c = CogniteClient()
>>> is_workorder = filters.Equals(EventProperty.type, "workorder")
>>> workorder_count = c.events.aggregate_count(advanced_filter=is_workorder)

Aggregate Event Values Cardinality

EventsAPI.aggregate_cardinality_values(property: EventPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: EventFilter | dict | None = None) int

Find approximate property count for events.

Parameters
  • property (EventPropertyLike) – The property to count the cardinality of.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the events to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.

Returns

The number of properties matching the specified filter.

Return type

int

Examples:

Count the number of types of events in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.events import EventProperty
>>> c = CogniteClient()
>>> type_count = c.events.aggregate_cardinality_values(EventProperty.type)

Count the number of types of events linked to asset 123 in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.events import EventProperty
>>> c = CogniteClient()
>>> is_asset = filters.ContainsAny(EventProperty.asset_ids, 123)
>>> plain_text_author_count = c.events.aggregate_cardinality_values(EventProperty.type, advanced_filter=is_asset)

Aggregate Event Property Cardinality

EventsAPI.aggregate_cardinality_properties(path: EventPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: EventFilter | dict | None = None) int

Find approximate paths count for events.

Parameters
  • path (EventPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The filter to narrow down the events to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.

Returns

The number of properties matching the specified filters and search.

Return type

int

Examples

Count the number of metadata keys for events in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.events import EventProperty
>>> c = CogniteClient()
>>> type_count = c.events.aggregate_cardinality_properties(EventProperty.metadata)

Aggregate Event Unique Values

EventsAPI.aggregate_unique_values(fields: Sequence[str], filter: EventFilter | dict | None = None, property: EventPropertyLike | None = None, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None) list[AggregateUniqueValuesResult]
EventsAPI.aggregate_unique_values(fields: Literal[None] = None, filter: EventFilter | dict | None = None, property: EventPropertyLike | None = None, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None) UniqueResultList

Get unique properties with counts for events.

Parameters
  • fields (Sequence[str] | None) – The fields to return. Defaults to [“count”].

  • filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.

  • property (EventPropertyLike | None) – The property name(s) to apply the aggregation on.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the events to consider.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

Returns

List of unique values of events matching the specified filters and search.

Return type

list[AggregateUniqueValuesResult] | UniqueResultList

Examples:

Get the unique types with count of events in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.events import EventProperty
>>> c = CogniteClient()
>>> result = c.events.aggregate_unique_values(EventProperty.type)
>>> print(result.unique)

Get the unique types of events after 2020-01-01 in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.events import EventProperty
>>> from cognite.client.utils import timestamp_to_ms
>>> from datetime import datetime
>>> c = CogniteClient()
>>> is_after_2020 = filters.Range(EventProperty.start_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.events.aggregate_unique_values(EventProperty.type, advanced_filter=is_after_2020)
>>> print(result.unique)

Get the unique types of events after 2020-01-01 in your CDF project, but exclude all types that start with “planned”:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.events import EventProperty
>>> from cognite.client.data_classes import aggregations
>>> c = CogniteClient()
>>> agg = aggregations
>>> not_planned = agg.Not(agg.Prefix("planned"))
>>> is_after_2020 = filters.Range(EventProperty.start_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.events.aggregate_unique_values(EventProperty.type, advanced_filter=is_after_2020, aggregate_filter=not_planned)
>>> print(result.unique)

Aggregate Event Unique Properties

EventsAPI.aggregate_unique_properties(path: EventPropertyLike, advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: EventFilter | dict | None = None) UniqueResultList

Get unique paths with counts for events.

Parameters
  • path (EventPropertyLike) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The filter to narrow down the events to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (EventFilter | dict | None) – The filter to narrow down the events to count requiring exact match.

Returns

List of unique values of events matching the specified filters and search.

Return type

UniqueResultList

Examples

Get the unique metadata keys with count of events in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.events import EventProperty
>>> c = CogniteClient()
>>> result = c.events.aggregate_unique_properties(EventProperty.metadata)
>>> print(result.unique)

Search for events

EventsAPI.search(description: str | None = None, filter: EventFilter | dict | None = None, limit: int = 25) EventList

Search for events Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.

Parameters
  • description (str | None) – Fuzzy match on description.

  • filter (EventFilter | dict | None) – Filter to apply. Performs exact match on these fields.

  • limit (int) – Maximum number of results to return.

Returns

List of requested events

Return type

EventList

Examples

Search for events:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.events.search(description="some description")

Create events

EventsAPI.create(event: Sequence[Event]) EventList
EventsAPI.create(event: Event) Event

Create one or more events.

Parameters

event (Event | Sequence[Event]) – Event or list of events to create.

Returns

Created event(s)

Return type

Event | EventList

Examples

Create new events:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Event
>>> c = CogniteClient()
>>> events = [Event(start_time=0, end_time=1), Event(start_time=2, end_time=3)]
>>> res = c.events.create(events)

Delete events

EventsAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, ignore_unknown_ids: bool = False) None

Delete one or more events

Parameters
  • id (int | Sequence[int] | None) – Id or list of ids

  • external_id (str | Sequence[str] | None) – External ID or list of external ids

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Examples

Delete events by id or external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.events.delete(id=[1,2,3], external_id="3")

Update events

EventsAPI.update(item: Sequence[Event | EventUpdate]) EventList
EventsAPI.update(item: Event | EventUpdate) Event

Update one or more events

Parameters

item (Event | EventUpdate | Sequence[Event | EventUpdate]) – Event(s) to update

Returns

Updated event(s)

Return type

Event | EventList

Examples

Update an event that you have fetched. This will perform a full update of the event:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> event = c.events.retrieve(id=1)
>>> event.description = "New description"
>>> res = c.events.update(event)

Perform a partial update on a event, updating the description and adding a new field to metadata:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import EventUpdate
>>> c = CogniteClient()
>>> my_update = EventUpdate(id=1).description.set("New description").metadata.add({"key": "value"})
>>> res = c.events.update(my_update)

Upsert events

EventsAPI.upsert(item: Sequence[Event], mode: Literal['patch', 'replace'] = 'patch') EventList
EventsAPI.upsert(item: Event, mode: Literal['patch', 'replace'] = 'patch') Event
Upsert events, i.e., update if it exists, and create if it does not exist.

Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.

For more details, see Upsert.

Parameters
  • item (Event | Sequence[Event]) – Event or list of events to upsert.

  • mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the events are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified.

Returns

The upserted event(s).

Return type

Event | EventList

Examples

Upsert for events:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Event
>>> c = CogniteClient()
>>> existing_event = c.events.retrieve(id=1)
>>> existing_event.description = "New description"
>>> new_event = Event(external_id="new_event", description="New event")
>>> res = c.events.upsert([existing_event, new_event], mode="replace")

Filter events

EventsAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, limit: int | None = 25) EventList

Advanced filter events

Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.

Parameters
  • filter (Filter | dict) – Filter to apply.

  • sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of events that match the filter criteria.

Return type

EventList

Examples

Find all events that has external id with prefix “workorder” and the word ‘failure’ in the description, and sort by start time descending:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> c = CogniteClient()
>>> f = filters
>>> is_workorder = f.Prefix("external_id", "workorder")
>>> has_failure = f.Search("description", "failure")
>>> res = c.events.filter(filter=f.And(is_workorder, has_failure),
...                       sort=("start_time", "desc"))

Note that you can check the API documentation above to see which properties you can filter on with which filters.

To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the EventProperty and SortableEventProperty enums.

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.events import EventProperty, SortableEventProperty
>>> c = CogniteClient()
>>> f = filters
>>> is_workorder = f.Prefix(EventProperty.external_id, "workorder")
>>> has_failure = f.Search(EventProperty.description, "failure")
>>> res = c.events.filter(filter=f.And(is_workorder, has_failure),
...                       sort=(SortableEventProperty.start_time, "desc"))

Events Data classes

class cognite.client.data_classes.events.EndTimeFilter(max: int | None = None, min: int | None = None, is_null: bool | None = None, **kwargs: Any)

Bases: dict

Either range between two timestamps or isNull filter condition.

Parameters
  • max (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • min (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • is_null (bool | None) – Set to true if you want to search for data with field value not set, false to search for cases where some value is present.

  • **kwargs (Any) – No description.

class cognite.client.data_classes.events.Event(external_id: str | None = None, data_set_id: int | None = None, start_time: int | None = None, end_time: int | None = None, type: str | None = None, subtype: str | None = None, description: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, source: str | None = None, id: int | None = None, last_updated_time: int | None = None, created_time: int | None = None, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

An event represents something that happened at a given interval in time, e.g a failure, a work order etc.

Parameters
  • external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.

  • data_set_id (int | None) – The id of the dataset this event belongs to.

  • start_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • end_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • type (str | None) – Type of the event, e.g ‘failure’.

  • subtype (str | None) – SubType of the event, e.g ‘electrical’.

  • description (str | None) – Textual description of the event.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 128000 bytes, up to 256 key-value pairs, of total size at most 200000.

  • asset_ids (Sequence[int] | None) – Asset IDs of equipment that this event relates to.

  • source (str | None) – The source of this event.

  • id (int | None) – A server-generated ID for the object.

  • last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • cognite_client (CogniteClient | None) – The client to associate with this object.

class cognite.client.data_classes.events.EventFilter(start_time: dict[str, Any] | TimestampRange | None = None, end_time: dict[str, Any] | EndTimeFilter | None = None, active_at_time: dict[str, Any] | TimestampRange | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, source: str | None = None, type: str | None = None, subtype: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, external_id_prefix: str | None = None)

Bases: CogniteFilter

Filter on events filter with exact match

Parameters
  • start_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • end_time (dict[str, Any] | EndTimeFilter | None) – Either range between two timestamps or isNull filter condition.

  • active_at_time (dict[str, Any] | TimestampRange | None) – Event is considered active from its startTime to endTime inclusive. If startTime is null, event is never active. If endTime is null, event is active from startTime onwards. activeAtTime filter will match all events that are active at some point from min to max, from min, or to max, depending on which of min and max parameters are specified.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 128000 bytes, up to 256 key-value pairs, of total size at most 200000.

  • asset_ids (Sequence[int] | None) – Asset IDs of equipment that this event relates to.

  • asset_external_ids (Sequence[str] | None) – Asset External IDs of equipment that this event relates to.

  • asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include events that have a related asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.

  • data_set_ids (Sequence[dict[str, Any]] | None) – Only include events that belong to these datasets.

  • source (str | None) – The source of this event.

  • type (str | None) – Type of the event, e.g ‘failure’.

  • subtype (str | None) – SubType of the event, e.g ‘electrical’.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

class cognite.client.data_classes.events.EventList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[Event], IdTransformerMixin

class cognite.client.data_classes.events.EventProperty(value)

Bases: EnumProperty

An enumeration.

class cognite.client.data_classes.events.EventUpdate(id: int | None = None, external_id: str | None = None)

Bases: CogniteUpdate

Changes will be applied to event.

Parameters
  • id (int) – A server-generated ID for the object.

  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

class cognite.client.data_classes.events.SortableEventProperty(value)

Bases: EnumProperty

An enumeration.

Data points

Warning

TimeSeries unit support is a new feature:
  • The API specification is in beta.

  • The SDK implementation is in alpha.

Unit conversion is implemented in the Datapoints APIs with the parameters target_unit and target_unit_system in the retrieve methods below. It is only the use of these arguments that is in alpha. Using the methods below without these arguments is stable.

Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.

Retrieve datapoints

DatapointsAPI.retrieve(*, id: None | int | dict[str, Any] | Sequence[int | dict[str, Any]] = None, external_id: None | str | dict[str, Any] | Sequence[str | dict[str, Any]] = None, start: int | str | datetime | None = None, end: int | str | datetime | None = None, aggregates: Aggregate | str | list[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, limit: int | None = None, include_outside_points: bool = False, ignore_unknown_ids: bool = False) Datapoints | DatapointsList | None

Retrieve datapoints for one or more time series.

Performance guide:

In order to retrieve millions of datapoints as efficiently as possible, here are a few guidelines:

  1. For best speed, and significantly lower memory usage, consider using retrieve_arrays(...) which uses numpy.ndarrays for data storage.

  2. Only unlimited queries with (limit=None) are fetched in parallel so specifying a large finite limit like 1 million, comes with severe performance penalty as data is fetched serially.

  3. Try to avoid specifying start and end to be very far from the actual data: If you have data from 2000 to 2015, don’t set start=0 (1970).

Parameters
  • id (None | int | dict[str, Any] | Sequence[int | dict[str, Any]]) – Id, dict (with id) or (mixed) sequence of these. See examples below.

  • external_id (None | str | dict[str, Any] | Sequence[str | dict[str, Any]]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.

  • start (int | str | datetime | None) – Inclusive start. Default: 1970-01-01 UTC.

  • end (int | str | datetime | None) – Exclusive end. Default: “now”

  • aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)

  • granularity (str | None) – The granularity to fetch aggregates at. e.g. ’15s’, ‘2h’, ‘10d’. Default: None.

  • target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.

  • target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.

  • limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)

  • include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False

  • ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False

Returns

A Datapoints object containing the requested data, or a DatapointsList if multiple time series were asked for (the ordering is ids first, then external_ids). If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.

Return type

Datapoints | DatapointsList | None

Examples

You can specify the identifiers of the datapoints you wish to retrieve in a number of ways. In this example we are using the time-ago format to get raw data for the time series with id=42 from 2 weeks ago up until now:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> dps = client.time_series.data.retrieve(id=42, start="2w-ago")

You can also get aggregated values, such as max or average. You may also fetch more than one time series simultaneously. Here we are getting daily averages and maximum values for all of 2018, for two different time series, where we’re specifying start and end as integers (milliseconds after epoch). Note that we are fetching them using their external ids:

>>> dps_lst = client.time_series.data.retrieve(
...    external_id=["foo", "bar"],
...    start=1514764800000,
...    end=1546300800000,
...    aggregates=["max", "average"],
...    granularity="1d")

In the two code examples above, we have a dps object (an instance of Datapoints), and a dps_lst object (an instance of DatapointsList). On dps, which in this case contains raw datapoints, you may access the underlying data directly by using the .value attribute. This works for both numeric and string (raw) datapoints, but not aggregates - they must be accessed by their respective names, because you’re allowed to fetch up to 10 aggregates simultaneously, and they are stored on the same object:

>>> raw_data = dps.value
>>> first_dps = dps_lst[0]  # optionally: `dps_lst.get(external_id="foo")`
>>> avg_data = first_dps.average
>>> max_data = first_dps.max

You may also slice a Datapoints object (you get Datapoints back), or ask for “a row of data” at a single index in same way you would do with a built-in list (you get a Datapoint object back, note the singular name). You’ll also get Datapoint objects when iterating through a Datapoints object, but this should generally be avoided (consider this a performance warning):

>>> dps_slice = dps[-10:]  # Last ten values
>>> dp = dps[3]  # The third value
>>> for dp in dps_slice:
...     pass  # do something!

All parameters can be individually set if you pass (one or more) dictionaries (even ignore_unknown_ids, contrary to the API). If you also pass top-level parameters, these will be overruled by the individual parameters (where both exist). You are free to mix any kind of ids and external ids: Single identifiers, single dictionaries and (mixed) lists of these.

Let’s say you want different aggregates and end-times for a few time series (when only fetching a single aggregate, you may pass the string directly for convenience):

>>> dps_lst = client.time_series.data.retrieve(
...     id=[
...         {"id": 42, "end": "1d-ago", "aggregates": "average"},
...         {"id": 69, "end": "2d-ago", "aggregates": ["average"]},
...         {"id": 96, "end": "3d-ago", "aggregates": ["min", "max", "count"]},
...     ],
...     external_id={"external_id": "foo", "aggregates": "max"},
...     start="5d-ago",
...     granularity="1h")

When requesting multiple time series, an easy way to get the datapoints of a specific one is to use the .get method on the returned DatapointsList object, then specify if you want id or external_id. Note: If you fetch a time series by using id, you can still access it with its external_id (and the opposite way around), if you know it:

>>> from datetime import datetime, timezone
>>> utc = timezone.utc
>>> dps_lst = client.time_series.data.retrieve(
...     start=datetime(1907, 10, 14, tzinfo=utc),
...     end=datetime(1907, 11, 6, tzinfo=utc),
...     id=[42, 43, 44, ..., 499, 500],
... )
>>> ts_350 = dps_lst.get(id=350)  # ``Datapoints`` object

…but what happens if you request some duplicate ids or external_ids? In this example we will show how to get data from multiple disconnected periods. Let’s say you’re tasked to train a machine learning model to recognize a specific failure mode of a system, and you want the training data to only be from certain periods (when an alarm was on/high). Assuming these alarms are stored as events in CDF, with both start- and end times, we can use these directly in the query.

After fetching, the .get method will return a list of Datapoints instead, (assuming we have more than one event) in the same order, similar to how slicing works with non-unique indices on Pandas DataFrames:

>>> periods = client.events.list(type="alarm", subtype="pressure")
>>> sensor_xid = "foo-pressure-bar"
>>> dps_lst = client.time_series.data.retrieve(
...     id=[42, 43, 44],
...     external_id=[
...         {"external_id": sensor_xid, "start": ev.start_time, "end": ev.end_time}
...         for ev in periods
...     ])
>>> ts_44 = dps_lst.get(id=44)  # Single ``Datapoints`` object
>>> ts_lst = dps_lst.get(external_id=sensor_xid)  # List of ``len(periods)`` ``Datapoints`` objects

The API has an endpoint to “retrieve latest (before)”, but not “after”. Luckily, we can emulate that behaviour easily. Let’s say we have a very dense time series and do not want to fetch all of the available raw data (or fetch less precise aggregate data), just to get the very first datapoint of every month (from e.g. the year 2000 through 2010):

>>> import itertools
>>> month_starts = [
...     datetime(year, month, 1, tzinfo=utc)
...     for year, month in itertools.product(range(2000, 2011), range(1, 13))]
>>> dps_lst = client.time_series.data.retrieve(
...     external_id=[{"external_id": "foo", "start": start} for start in month_starts],
...     limit=1)

To get all historic and future datapoints for a time series, e.g. to do a backup, you may want to import the two integer constants: MIN_TIMESTAMP_MS and MAX_TIMESTAMP_MS, to make sure you do not miss any. Performance warning: This pattern of fetching datapoints from the entire valid time domain is slower and shouldn’t be used for regular “day-to-day” queries:

>>> from cognite.client.utils import MIN_TIMESTAMP_MS, MAX_TIMESTAMP_MS
>>> dps_backup = client.time_series.data.retrieve(
...     id=123,
...     start=MIN_TIMESTAMP_MS,
...     end=MAX_TIMESTAMP_MS + 1)  # end is exclusive

Another example here is just to showcase the great flexibility of the retrieve endpoint, with a very custom query:

>>> ts1 = 1337
>>> ts2 = {
...     "id": 42,
...     "start": -12345,  # Overrides `start` arg. below
...     "end": "1h-ago",
...     "limit": 1000,  # Overrides `limit` arg. below
...     "include_outside_points": True,
... }
>>> ts3 = {
...     "id": 11,
...     "end": "1h-ago",
...     "aggregates": ["max"],
...     "granularity": "42h",
...     "include_outside_points": False,
...     "ignore_unknown_ids": True,  # Overrides `ignore_unknown_ids` arg. below
... }
>>> dps_lst = client.time_series.data.retrieve(
...    id=[ts1, ts2, ts3], start="2w-ago", limit=None, ignore_unknown_ids=False)

If we have created a timeseries set with ‘unit_external_id’ we can use the ‘target_unit’ parameter to convert the datapoints to the desired unit. In the example below, we assume that the timeseries is set with unit_external_id ‘temperature:deg_c’ and id=’42’.

>>> client.time_series.data.retrieve(
...   id=42, start="2w-ago", limit=None, target_unit="temperature:deg_f")

Or alternatively, we can use the ‘target_unit_system’ parameter to convert the datapoints to the desired unit system.

>>> client.time_series.data.retrieve(
...   id=42, start="2w-ago", limit=None, target_unit_system="Imperial")

Retrieve datapoints as numpy arrays

DatapointsAPI.retrieve_arrays(*, id: None | int | dict[str, Any] | Sequence[int | dict[str, Any]] = None, external_id: None | str | dict[str, Any] | Sequence[str | dict[str, Any]] = None, start: int | str | datetime | None = None, end: int | str | datetime | None = None, aggregates: Aggregate | str | list[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, limit: int | None = None, include_outside_points: bool = False, ignore_unknown_ids: bool = False) DatapointsArray | DatapointsArrayList | None

Retrieve datapoints for one or more time series.

Note: This method requires numpy to be installed.

Parameters
  • id (None | int | dict[str, Any] | Sequence[int | dict[str, Any]]) – Id, dict (with id) or (mixed) sequence of these. See examples below.

  • external_id (None | str | dict[str, Any] | Sequence[str | dict[str, Any]]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.

  • start (int | str | datetime | None) – Inclusive start. Default: 1970-01-01 UTC.

  • end (int | str | datetime | None) – Exclusive end. Default: “now”

  • aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)

  • granularity (str | None) – The granularity to fetch aggregates at. e.g. ’15s’, ‘2h’, ‘10d’. Default: None.

  • target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.

  • target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.

  • limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)

  • include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False

  • ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False

Returns

A DatapointsArray object containing the requested data, or a DatapointsArrayList if multiple time series were asked for (the ordering is ids first, then external_ids). If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.

Return type

DatapointsArray | DatapointsArrayList | None

Examples

Note: For many more usage examples, check out the retrieve() method which accepts exactly the same arguments.

Get weekly min and max aggregates for a time series with id=42 since the year 2000, then compute the range of values:

>>> from cognite.client import CogniteClient
>>> from datetime import datetime, timezone
>>> client = CogniteClient()
>>> dps = client.time_series.data.retrieve_arrays(
...     id=42,
...     start=datetime(2020, 1, 1, tzinfo=timezone.utc),
...     aggregates=["min", "max"],
...     granularity="7d")
>>> weekly_range = dps.max - dps.min

Get up-to 2 million raw datapoints for the last 48 hours for a noisy time series with external_id=”ts-noisy”, then use a small and wide moving average filter to smooth it out:

>>> import numpy as np
>>> dps = client.time_series.data.retrieve_arrays(
...     external_id="ts-noisy",
...     start="2d-ago",
...     limit=2_000_000)
>>> smooth = np.convolve(dps.value, np.ones(5) / 5)  
>>> smoother = np.convolve(dps.value, np.ones(20) / 20)  

Get raw datapoints for multiple time series, that may or may not exist, from the last 2 hours, then find the largest gap between two consecutive values for all time series, also taking the previous value into account (outside point).

>>> id_lst = [42, 43, 44]
>>> dps_lst = client.time_series.data.retrieve_arrays(
...     id=id_lst,
...     start="2h-ago",
...     include_outside_points=True,
...     ignore_unknown_ids=True)
>>> largest_gaps = [np.max(np.diff(dps.timestamp)) for dps in dps_lst]

Get raw datapoints for a time series with external_id=”bar” from the last 10 weeks, then convert to a pandas.Series (you can of course also use the to_pandas() convenience method if you want a pandas.DataFrame):

>>> import pandas as pd
>>> dps = client.time_series.data.retrieve_arrays(external_id="bar", start="10w-ago")
>>> series = pd.Series(dps.value, index=dps.timestamp)

Retrieve datapoints in pandas dataframe

DatapointsAPI.retrieve_dataframe(*, id: None | int | dict[str, Any] | Sequence[int | dict[str, Any]] = None, external_id: None | str | dict[str, Any] | Sequence[str | dict[str, Any]] = None, start: int | str | datetime | None = None, end: int | str | datetime | None = None, aggregates: Aggregate | str | list[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, limit: int | None = None, include_outside_points: bool = False, ignore_unknown_ids: bool = False, uniform_index: bool = False, include_aggregate_name: bool = True, include_granularity_name: bool = False, column_names: Literal['id', 'external_id'] = 'external_id') pd.DataFrame

Get datapoints directly in a pandas dataframe.

Note: If you have duplicated time series in your query, the dataframe columns will also contain duplicates.

Parameters
  • id (None | int | dict[str, Any] | Sequence[int | dict[str, Any]]) – Id, dict (with id) or (mixed) sequence of these. See examples below.

  • external_id (None | str | dict[str, Any] | Sequence[str | dict[str, Any]]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.

  • start (int | str | datetime | None) – Inclusive start. Default: 1970-01-01 UTC.

  • end (int | str | datetime | None) – Exclusive end. Default: “now”

  • aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)

  • granularity (str | None) – The granularity to fetch aggregates at. e.g. ’15s’, ‘2h’, ‘10d’. Default: None.

  • target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.

  • target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.

  • limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)

  • include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False

  • ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False

  • uniform_index (bool) – If only querying aggregates AND a single granularity is used AND no limit is used, specifying uniform_index=True will return a dataframe with an equidistant datetime index from the earliest start to the latest end (missing values will be NaNs). If these requirements are not met, a ValueError is raised. Default: False

  • include_aggregate_name (bool) – Include ‘aggregate’ in the column name, e.g. my-ts|average. Ignored for raw time series. Default: True

  • include_granularity_name (bool) – Include ‘granularity’ in the column name, e.g. my-ts|12h. Added after ‘aggregate’ when present. Ignored for raw time series. Default: False

  • column_names (Literal["id", "external_id"]) – Use either ids or external ids as column names. Time series missing external id will use id as backup. Default: “external_id”

Returns

A pandas DataFrame containing the requested time series. The ordering of columns is ids first, then external_ids. For time series with multiple aggregates, they will be sorted in alphabetical order (“average” before “max”).

Return type

pd.DataFrame

Examples

Get a pandas dataframe using a single id, and use this id as column name, with no more than 100 datapoints:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> df = client.time_series.data.retrieve_dataframe(
...     id=12345,
...     start="2w-ago",
...     end="now",
...     limit=100,
...     column_names="id")

Get the pandas dataframe with a uniform index (fixed spacing between points) of 1 day, for two time series with individually specified aggregates, from 1990 through 2020:

>>> from datetime import datetime, timezone
>>> df = client.time_series.data.retrieve_dataframe(
...     id=[
...         {"external_id": "foo", "aggregates": ["discrete_variance"]},
...         {"external_id": "bar", "aggregates": ["total_variation", "continuous_variance"]},
...     ],
...     granularity="1d",
...     start=datetime(1990, 1, 1, tzinfo=timezone.utc),
...     end=datetime(2020, 12, 31, tzinfo=timezone.utc),
...     uniform_index=True)

Get a pandas dataframe containing the ‘average’ aggregate for two time series using a 30-day granularity, starting Jan 1, 1970 all the way up to present, without having the aggregate name in the column names:

>>> df = client.time_series.data.retrieve_dataframe(
...     external_id=["foo", "bar"],
...     aggregates=["average"],
...     granularity="30d",
...     include_aggregate_name=False)

Remember that pandas.Timestamp is a subclass of datetime, so you can use Timestamps as start and end arguments:

>>> import pandas as pd
>>> df = client.time_series.data.retrieve_dataframe(
...     external_id="foo",
...     start=pd.Timestamp("2023-01-01"),
...     end=pd.Timestamp("2023-02-01"),
...     )

Retrieve datapoints in time zone in pandas dataframe

DatapointsAPI.retrieve_dataframe_in_tz(*, id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, start: datetime, end: datetime, aggregates: Aggregate | str | Sequence[Aggregate | str] | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, ignore_unknown_ids: bool = False, uniform_index: bool = False, include_aggregate_name: bool = True, include_granularity_name: bool = False, column_names: Literal['id', 'external_id'] = 'external_id') pd.DataFrame

Get datapoints directly in a pandas dataframe in the same time zone as start and end.

Note

This is a convenience method. It builds on top of the methods retrieve_arrays and retrieve_dataframe. It enables you to get correct aggregates in your local time zone with daily, weekly, monthly, quarterly, and yearly aggregates with automatic handling for daylight saving time (DST) transitions. If your time zone observes DST, and your query crosses at least one DST-boundary, granularities like “3 days” or “1 week”, that used to represent fixed durations, no longer do so. To understand why, let’s illustrate with an example: A typical time zone (above the equator) that observes DST will skip one hour ahead during spring, leading to a day that is only 23 hours long, and oppositely in the fall, turning back the clock one hour, yielding a 25-hour-long day.

In short, this method works as follows:
  1. Get the time zone from start and end (must be equal).

  2. Split the time range from start to end into intervals based on DST boundaries.

  3. Create a query for each interval and pass all to the retrieve_arrays method.

  4. Stack the resulting arrays into a single column in the resulting DataFrame.

Warning

The queries to retrieve_arrays are translated to a multiple of hours. This means that time zones that are not a whole hour offset from UTC are not supported (yet). The same is true for time zones that observe DST with an offset from standard time that is not a multiple of 1 hour.

Parameters
  • id (int | Sequence[int] | None) – ID or list of IDs.

  • external_id (str | Sequence[str] | None) – External ID or list of External IDs.

  • start (datetime) – Inclusive start, must be time zone aware.

  • end (datetime) – Exclusive end, must be time zone aware and have the same time zone as start.

  • aggregates (Aggregate | str | Sequence[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Default: None (raw datapoints returned)

  • granularity (str | None) – The granularity to fetch aggregates at, supported are: second, minute, hour, day, week, month, quarter and year. Default: None.

  • target_unit (str | None) – The unit_external_id of the data points returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.

  • target_unit_system (str | None) – The unit system of the data points returned. Cannot be used with target_unit.

  • ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False

  • uniform_index (bool) – If querying aggregates, specifying uniform_index=True will return a dataframe with an index with constant spacing between timestamps decided by granularity all the way from start to end (missing values will be NaNs). Default: False

  • include_aggregate_name (bool) – Include ‘aggregate’ in the column name, e.g. my-ts|average. Ignored for raw time series. Default: True

  • include_granularity_name (bool) – Include ‘granularity’ in the column name, e.g. my-ts|12h. Added after ‘aggregate’ when present. Ignored for raw time series. Default: False

  • column_names (Literal["id", "external_id"]) – Use either ids or external ids as column names. Time series missing external id will use id as backup. Default: “external_id”

Returns

A pandas DataFrame containing the requested time series with a DatetimeIndex localized in the given time zone.

Return type

pd.DataFrame

Examples

Get a pandas dataframe in the time zone of Oslo, Norway:

>>> from cognite.client import CogniteClient
>>> # In Python >=3.9 you may import directly from `zoneinfo`
>>> from cognite.client.utils import ZoneInfo
>>> client = CogniteClient()
>>> df = client.time_series.data.retrieve_dataframe_in_tz(
...     id=12345,
...     start=datetime(2023, 1, 1, tzinfo=ZoneInfo("Europe/Oslo")),
...     end=datetime(2023, 2, 1, tzinfo=ZoneInfo("Europe/Oslo")),
...     aggregates="average",
...     granularity="1week",
...     column_names="id")

Get a pandas dataframe with the sum and continuous variance of the time series with external id “foo” and “bar”, for each quarter from 2020 to 2022 returned in the time zone of Oslo, Norway:

>>> from cognite.client import CogniteClient
>>> # In Python >=3.9 you may import directly from `zoneinfo`
>>> from cognite.client.utils import ZoneInfo
>>> client = CogniteClient()
>>> df = client.time_series.data.retrieve_dataframe(
...     external_id=["foo", "bar"],
...     aggregates=["sum", "continuous_variance"],
...     granularity="1quarter",
...     start=datetime(2020, 1, 1, tzinfo=ZoneInfo("Europe/Oslo")),
...     end=datetime(2022, 12, 31, tzinfo=ZoneInfo("Europe/Oslo")))

Tip

You can also use shorter granularities such as second(s), minute(s), hour(s), which do not require any special handling of DST. The longer granularities at your disposal, which are adjusted for DST, are: day(s), week(s), month(s), quarter(s) and year(s). All the granularities support a one-letter version s, m, h, d, w, q, and y, except for month, to avoid confusion with minutes. Furthermore, the granularity is expected to be given as a lowercase.

Retrieve latest datapoint

DatapointsAPI.retrieve_latest(id: int | LatestDatapointQuery | list[int | LatestDatapointQuery] | None = None, external_id: str | LatestDatapointQuery | list[str | LatestDatapointQuery] | None = None, before: None | int | str | datetime = None, ignore_unknown_ids: bool = False) Datapoints | DatapointsList | None

Get the latest datapoint for one or more time series

Parameters
  • id (int | LatestDatapointQuery | list[int | LatestDatapointQuery] | None) – Id or list of ids.

  • external_id (str | LatestDatapointQuery | list[str | LatestDatapointQuery] | None) – External id or list of external ids.

  • before (None | int | str | datetime) – (Union[int, str, datetime]): Get latest datapoint before this time. Not used when passing ‘LatestDatapointQuery’.

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Returns

A Datapoints object containing the requested data, or a DatapointsList if multiple were requested. If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.

Return type

Datapoints | DatapointsList | None

Examples

Getting the latest datapoint in a time series. This method returns a Datapoints object, so the datapoint will be the first element:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.data.retrieve_latest(id=1)[0]

You can also get the first datapoint before a specific time:

>>> res = c.time_series.data.retrieve_latest(id=1, before="2d-ago")[0]

You may also pass an instance of LatestDatapointQuery:

>>> from cognite.client.data_classes import LatestDatapointQuery
>>> res = c.time_series.data.retrieve_latest(id=LatestDatapointQuery(id=1, before=60_000))[0]

If you need the latest datapoint for multiple time series, simply give a list of ids. Note that we are using external ids here, but either will work:

>>> res = c.time_series.data.retrieve_latest(external_id=["abc", "def"])
>>> latest_abc = res[0][0]
>>> latest_def = res[1][0]

If you need to specify a different value of ‘before’ for each time series, you may pass several LatestDatapointQuery objects:

>>> from datetime import datetime, timezone
>>> id_queries = [
...     123,
...     LatestDatapointQuery(id=456, before="1w-ago"),
...     LatestDatapointQuery(id=789, before=datetime(2018,1,1, tzinfo=timezone.utc))]
>>> res = c.time_series.data.retrieve_latest(
...     id=id_queries,
...     external_id=LatestDatapointQuery(external_id="abc", before="3h-ago"))

Insert data points

DatapointsAPI.insert(datapoints: Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime]] | Sequence[tuple[int | float | datetime, int | float | str]], id: int | None = None, external_id: str | None = None) None

Insert datapoints into a time series

Timestamps can be represented as milliseconds since epoch or datetime objects.

Parameters
  • datapoints (Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime]] | Sequence[tuple[int | float | datetime, int | float | str]]) – The datapoints you wish to insert. Can either be a list of tuples, a list of dictionaries, a Datapoints object or a DatapointsArray object. See examples below.

  • id (int | None) – Id of time series to insert datapoints into.

  • external_id (str | None) – External id of time series to insert datapoint into.

Examples

Your datapoints can be a list of tuples where the first element is the timestamp and the second element is the value:

>>> from cognite.client import CogniteClient
>>> from datetime import datetime, timezone
>>> c = CogniteClient()
>>> # With datetime objects:
>>> datapoints = [
...     (datetime(2018,1,1, tzinfo=timezone.utc), 1000),
...     (datetime(2018,1,2, tzinfo=timezone.utc), 2000),
... ]
>>> c.time_series.data.insert(datapoints, id=1)
>>> # With ms since epoch:
>>> datapoints = [(150000000000, 1000), (160000000000, 2000)]
>>> c.time_series.data.insert(datapoints, id=2)

Or they can be a list of dictionaries:

>>> datapoints = [
...     {"timestamp": 150000000000, "value": 1000},
...     {"timestamp": 160000000000, "value": 2000},
... ]
>>> c.time_series.data.insert(datapoints, external_id="abcd")

Or they can be a Datapoints or DatapointsArray object (with raw datapoints only). Note that the id or external_id set on these objects are not inspected/used (as they belong to the “from-time-series”, and not the “to-time-series”), and so you must explicitly pass the identifier of the time series you want to insert into, which in this example is external_id=”foo”:

>>> data = c.time_series.data.retrieve(external_id="abc", start="1w-ago", end="now")
>>> c.time_series.data.insert(data, external_id="foo")

Insert data points into multiple time series

DatapointsAPI.insert_multiple(datapoints: list[dict[str, str | int | list | Datapoints | DatapointsArray]]) None

Insert datapoints into multiple time series

Parameters

datapoints (list[dict[str, str | int | list | Datapoints | DatapointsArray]]) – The datapoints you wish to insert along with the ids of the time series. See examples below.

Examples

Your datapoints can be a list of dictionaries, each containing datapoints for a different (presumably) time series. These dictionaries must have the key “datapoints” (containing the data) specified as a Datapoints object, a DatapointsArray object, or list of either tuples (timestamp, value) or dictionaries, {“timestamp”: ts, “value”: value}:

>>> from cognite.client import CogniteClient
>>> from datetime import datetime, timezone
>>> client = CogniteClient()

>>> datapoints = []
>>> # With datetime objects and id
>>> datapoints.append(
...     {"id": 1, "datapoints": [
...         (datetime(2018,1,1,tzinfo=timezone.utc), 1000),
...         (datetime(2018,1,2,tzinfo=timezone.utc), 2000),
... ]})

>>> # With ms since epoch and external_id:
>>> datapoints.append({"external_id": "foo", "datapoints": [(150000000000, 1000), (160000000000, 2000)]})

>>> # With raw data in a Datapoints object (or DatapointsArray):
>>> data_to_clone = client.time_series.data.retrieve(external_id="bar")
>>> datapoints.append({"external_id": "bar-clone", "datapoints": data_to_clone})
>>> client.time_series.data.insert_multiple(datapoints)

Insert pandas dataframe

DatapointsAPI.insert_dataframe(df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) None

Insert a dataframe (columns must be unique).

The index of the dataframe must contain the timestamps (pd.DatetimeIndex). The names of the columns specify the ids or external ids of the time series to which the datapoints will be written.

Said time series must already exist.

Parameters
  • df (pd.DataFrame) – Pandas DataFrame object containing the time series.

  • external_id_headers (bool) – Interpret the column names as external id. Pass False if using ids. Default: True.

  • dropna (bool) – Set to True to ignore NaNs in the given DataFrame, applied per column. Default: True.

Examples

Post a dataframe with white noise:

>>> import numpy as np
>>> import pandas as pd
>>> from cognite.client import CogniteClient
>>>
>>> client = CogniteClient()
>>> ts_xid = "my-foo-ts"
>>> idx = pd.date_range(start="2018-01-01", periods=100, freq="1d")
>>> noise = np.random.normal(0, 1, 100)
>>> df = pd.DataFrame({ts_xid: noise}, index=idx)
>>> client.time_series.data.insert_dataframe(df)

Delete a range of data points

DatapointsAPI.delete_range(start: int | str | datetime, end: int | str | datetime, id: int | None = None, external_id: str | None = None) None

Delete a range of datapoints from a time series.

Parameters
  • start (int | str | datetime) – Inclusive start of delete range

  • end (int | str | datetime) – Exclusive end of delete range

  • id (int | None) – Id of time series to delete data from

  • external_id (str | None) – External id of time series to delete data from

Examples

Deleting the last week of data from a time series:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.time_series.data.delete_range(start="1w-ago", end="now", id=1)

Delete ranges of data points

DatapointsAPI.delete_ranges(ranges: list[dict[str, Any]]) None

Delete a range of datapoints from multiple time series.

Parameters

ranges (list[dict[str, Any]]) – The list of datapoint ids along with time range to delete. See examples below.

Examples

Each element in the list ranges must be specify either id or external_id, and a range:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> ranges = [{"id": 1, "start": "2d-ago", "end": "now"},
...           {"external_id": "abc", "start": "2d-ago", "end": "now"}]
>>> c.time_series.data.delete_ranges(ranges)

Data Points Data classes

class cognite.client.data_classes.datapoints.Datapoint(timestamp: int | None = None, value: str | float | None = None, average: float | None = None, max: float | None = None, min: float | None = None, count: int | None = None, sum: float | None = None, interpolation: float | None = None, step_interpolation: float | None = None, continuous_variance: float | None = None, discrete_variance: float | None = None, total_variation: float | None = None)

Bases: CogniteResource

An object representing a datapoint.

Parameters
  • timestamp (int | None) – The data timestamp in milliseconds since the epoch (Jan 1, 1970). Can be negative to define a date before 1970. Minimum timestamp is 1900.01.01 00:00:00 UTC

  • value (str | float | None) – The data value. Can be string or numeric

  • average (float | None) – The integral average value in the aggregate period

  • max (float | None) – The maximum value in the aggregate period

  • min (float | None) – The minimum value in the aggregate period

  • count (int | None) – The number of datapoints in the aggregate period

  • sum (float | None) – The sum of the datapoints in the aggregate period

  • interpolation (float | None) – The interpolated value of the series in the beginning of the aggregate

  • step_interpolation (float | None) – The last value before or at the beginning of the aggregate.

  • continuous_variance (float | None) – The variance of the interpolated underlying function.

  • discrete_variance (float | None) – The variance of the datapoint values.

  • total_variation (float | None) – The total variation of the interpolated underlying function.

to_pandas(camel_case: bool = False) pandas.DataFrame

Convert the datapoint into a pandas DataFrame.

Parameters

camel_case (bool) – Convert column names to camel case (e.g. stepInterpolation instead of step_interpolation)

Returns

pandas.DataFrame

Return type

pandas.DataFrame

class cognite.client.data_classes.datapoints.Datapoints(id: int | None = None, external_id: str | None = None, is_string: bool | None = None, is_step: bool | None = None, unit: str | None = None, unit_external_id: str | None = None, granularity: str | None = None, timestamp: Sequence[int] | None = None, value: Sequence[str] | Sequence[float] | None = None, average: list[float] | None = None, max: list[float] | None = None, min: list[float] | None = None, count: list[int] | None = None, sum: list[float] | None = None, interpolation: list[float] | None = None, step_interpolation: list[float] | None = None, continuous_variance: list[float] | None = None, discrete_variance: list[float] | None = None, total_variation: list[float] | None = None, error: list[None | str] | None = None)

Bases: CogniteResource

An object representing a list of datapoints.

Parameters
  • id (int | None) – Id of the timeseries the datapoints belong to

  • external_id (str | None) – External id of the timeseries the datapoints belong to

  • is_string (bool | None) – Whether the time series is string valued or not.

  • is_step (bool | None) – Whether the time series is a step series or not.

  • unit (str | None) – The physical unit of the time series (free-text field).

  • unit_external_id (str | None) – The unit_external_id (as defined in the unit catalog) of the returned data points. If the datapoints were converted to a compatible unit, this will equal the converted unit, not the one defined on the time series.

  • granularity (str | None) – The granularity of the aggregate datapoints (does not apply to raw data)

  • timestamp (Sequence[int] | None) – The data timestamps in milliseconds since the epoch (Jan 1, 1970). Can be negative to define a date before 1970. Minimum timestamp is 1900.01.01 00:00:00 UTC

  • value (Sequence[str] | Sequence[float] | None) – The data values. Can be string or numeric

  • average (list[float] | None) – The integral average values in the aggregate period

  • max (list[float] | None) – The maximum values in the aggregate period

  • min (list[float] | None) – The minimum values in the aggregate period

  • count (list[int] | None) – The number of datapoints in the aggregate periods

  • sum (list[float] | None) – The sum of the datapoints in the aggregate periods

  • interpolation (list[float] | None) – The interpolated values of the series in the beginning of the aggregates

  • step_interpolation (list[float] | None) – The last values before or at the beginning of the aggregates.

  • continuous_variance (list[float] | None) – The variance of the interpolated underlying function.

  • discrete_variance (list[float] | None) – The variance of the datapoint values.

  • total_variation (list[float] | None) – The total variation of the interpolated underlying function.

  • error (list[None | str] | None) – No description.

dump(camel_case: bool = False) dict[str, Any]

Dump the datapoints into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representing the instance.

Return type

dict[str, Any]

to_pandas(column_names: str = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False, include_errors: bool = False) pandas.DataFrame

Convert the datapoints into a pandas DataFrame.

Parameters
  • column_names (str) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.

  • include_aggregate_name (bool) – Include aggregate in the column name

  • include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)

  • include_errors (bool) – For synthetic datapoint queries, include a column with errors.

Returns

The dataframe.

Return type

pandas.DataFrame

class cognite.client.data_classes.datapoints.DatapointsArray(id: int | None = None, external_id: str | None = None, is_string: bool | None = None, is_step: bool | None = None, unit: str | None = None, unit_external_id: str | None = None, granularity: str | None = None, timestamp: NumpyDatetime64NSArray | None = None, value: NumpyFloat64Array | NumpyObjArray | None = None, average: NumpyFloat64Array | None = None, max: NumpyFloat64Array | None = None, min: NumpyFloat64Array | None = None, count: NumpyInt64Array | None = None, sum: NumpyFloat64Array | None = None, interpolation: NumpyFloat64Array | None = None, step_interpolation: NumpyFloat64Array | None = None, continuous_variance: NumpyFloat64Array | None = None, discrete_variance: NumpyFloat64Array | None = None, total_variation: NumpyFloat64Array | None = None)

Bases: CogniteResource

An object representing datapoints using numpy arrays.

dump(camel_case: bool = False, convert_timestamps: bool = False) dict[str, Any]

Dump the DatapointsArray into a json serializable Python data type.

Parameters
  • camel_case (bool) – Use camelCase for attribute names. Default: False.

  • convert_timestamps (bool) – Convert timestamps to ISO 8601 formatted strings. Default: False (returns as integer, milliseconds since epoch)

Returns

A dictionary representing the instance.

Return type

dict[str, Any]

to_pandas(column_names: Literal['id', 'external_id'] = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False) pandas.DataFrame

Convert the DatapointsArray into a pandas DataFrame.

Parameters
  • column_names (Literal["id", "external_id"]) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.

  • include_aggregate_name (bool) – Include aggregate in the column name

  • include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)

Returns

The datapoints as a pandas DataFrame.

Return type

pandas.DataFrame

class cognite.client.data_classes.datapoints.DatapointsArrayList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[DatapointsArray]

concat_duplicate_ids() None

Concatenates all arrays with duplicated IDs.

Arrays with the same ids are stacked in chronological order.

Caveat This method is not guaranteed to preserve the order of the list.

dump(camel_case: bool = False, convert_timestamps: bool = False) list[dict[str, Any]]

Dump the instance into a json serializable Python data type.

Parameters
  • camel_case (bool) – Use camelCase for attribute names. Default: False.

  • convert_timestamps (bool) – Convert timestamps to ISO 8601 formatted strings. Default: False (returns as integer, milliseconds since epoch)

Returns

A list of dicts representing the instance.

Return type

list[dict[str, Any]]

get(id: int | None = None, external_id: str | None = None) DatapointsArray | list[DatapointsArray] | None

Get a specific DatapointsArray from this list by id or external_id.

Note: For duplicated time series, returns a list of DatapointsArray.

Parameters
  • id (int | None) – The id of the item(s) to get.

  • external_id (str | None) – The external_id of the item(s) to get.

Returns

The requested item(s)

Return type

DatapointsArray | list[DatapointsArray] | None

to_pandas(column_names: Literal['id', 'external_id'] = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False) pandas.DataFrame

Convert the DatapointsArrayList into a pandas DataFrame.

Parameters
  • column_names (Literal["id", "external_id"]) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.

  • include_aggregate_name (bool) – Include aggregate in the column name

  • include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)

Returns

The datapoints as a pandas DataFrame.

Return type

pandas.DataFrame

class cognite.client.data_classes.datapoints.DatapointsList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[Datapoints]

get(id: int | None = None, external_id: str | None = None) Datapoints | list[Datapoints] | None

Get a specific Datapoints from this list by id or external_id.

Note: For duplicated time series, returns a list of Datapoints.

Parameters
  • id (int | None) – The id of the item(s) to get.

  • external_id (str | None) – The external_id of the item(s) to get.

Returns

The requested item(s)

Return type

Datapoints | list[Datapoints] | None

to_pandas(column_names: Literal['id', 'external_id'] = 'external_id', include_aggregate_name: bool = True, include_granularity_name: bool = False) pandas.DataFrame

Convert the datapoints list into a pandas DataFrame.

Parameters
  • column_names (Literal["id", "external_id"]) – Which field to use as column header. Defaults to “external_id”, can also be “id”. For time series with no external ID, ID will be used instead.

  • include_aggregate_name (bool) – Include aggregate in the column name

  • include_granularity_name (bool) – Include granularity in the column name (after aggregate if present)

Returns

The datapoints list as a pandas DataFrame.

Return type

pandas.DataFrame

class cognite.client.data_classes.datapoints.LatestDatapointQuery(id: int | None = None, external_id: str | None = None, before: None | int | str | datetime = None)

Bases: object

Parameters describing a query for the latest datapoint from a time series.

Note

Pass either ID or external ID.

Parameters
  • id (Optional[int]) – The internal ID of the time series to query.

  • external_id (Optional[str]) – The external ID of the time series to query.

  • before (Union[None, int, str, datetime]) – Get latest datapoint before this time. None means ‘now’.

Files

Retrieve file metadata by id

FilesAPI.retrieve(id: int | None = None, external_id: str | None = None) FileMetadata | None

Retrieve a single file metadata by id.

Parameters
  • id (int | None) – ID

  • external_id (str | None) – External ID

Returns

Requested file metadata or None if it does not exist.

Return type

FileMetadata | None

Examples

Get file metadata by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.retrieve(id=1)

Get file metadata by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.retrieve(external_id="1")

Retrieve multiple files’ metadata by id

FilesAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) FileMetadataList

Retrieve multiple file metadatas by id.

Parameters
  • ids (Sequence[int] | None) – IDs

  • external_ids (Sequence[str] | None) – External IDs

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Returns

The requested file metadatas.

Return type

FileMetadataList

Examples

Get file metadatas by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.retrieve_multiple(ids=[1, 2, 3])

Get file_metadatas by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.retrieve_multiple(external_ids=["abc", "def"])

List files metadata

FilesAPI.list(name: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, source_created_time: dict[str, Any] | TimestampRange | None = None, source_modified_time: dict[str, Any] | TimestampRange | None = None, uploaded_time: dict[str, Any] | TimestampRange | None = None, external_id_prefix: str | None = None, directory_prefix: str | None = None, uploaded: bool | None = None, limit: int | None = 25) FileMetadataList

List files

Parameters
  • name (str | None) – Name of the file.

  • mime_type (str | None) – File type. E.g. text/plain, application/pdf, ..

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value

  • asset_ids (Sequence[int] | None) – Only include files that reference these specific asset IDs.

  • asset_external_ids (Sequence[str] | None) – No description.

  • asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.

  • asset_subtree_external_ids (str | Sequence[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.

  • data_set_ids (int | Sequence[int] | None) – Return only files in the specified data set(s) with this id / these ids.

  • data_set_external_ids (str | Sequence[str] | None) – Return only files in the specified data set(s) with this external id / these external ids.

  • labels (LabelFilter | None) – Return only the files matching the specified label filter(s).

  • geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.

  • source (str | None) – The source of this event.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • source_created_time (dict[str, Any] | TimestampRange | None) – Filter for files where the sourceCreatedTime field has been set and is within the specified range.

  • source_modified_time (dict[str, Any] | TimestampRange | None) – Filter for files where the sourceModifiedTime field has been set and is within the specified range.

  • uploaded_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps

  • external_id_prefix (str | None) – External Id provided by client. Should be unique within the project.

  • directory_prefix (str | None) – Filter by this (case-sensitive) prefix for the directory provided by the client.

  • uploaded (bool | None) – Whether or not the actual file is uploaded. This field is returned only by the API, it has no effect in a post body.

  • limit (int | None) – Max number of files to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

The requested files.

Return type

FileMetadataList

Examples

List files metadata and filter on external id prefix:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> file_list = c.files.list(limit=5, external_id_prefix="prefix")

Iterate over files metadata:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for file_metadata in c.files:
...     file_metadata # do something with the file metadata

Iterate over chunks of files metadata to reduce memory load:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for file_list in c.files(chunk_size=2500):
...     file_list # do something with the files

Filter files based on labels:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import LabelFilter
>>> c = CogniteClient()
>>> my_label_filter = LabelFilter(contains_all=["WELL LOG", "VERIFIED"])
>>> file_list = c.files.list(labels=my_label_filter)

Filter files based on geoLocation:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import GeoLocationFilter, GeometryFilter
>>> c = CogniteClient()
>>> my_geo_location_filter = GeoLocationFilter(relation="intersects", shape=GeometryFilter(type="Point", coordinates=[35,10]))
>>> file_list = c.files.list(geo_location=my_geo_location_filter)

Aggregate files metadata

FilesAPI.aggregate(filter: FileMetadataFilter | dict | None = None) list[FileAggregate]

Aggregate files

Parameters

filter (FileMetadataFilter | dict | None) – Filter on file metadata filter with exact match

Returns

List of file aggregates

Return type

list[FileAggregate]

Examples

List files metadata and filter on external id prefix:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> aggregate_uploaded = c.files.aggregate(filter={"uploaded": True})

Search for files

FilesAPI.search(name: str | None = None, filter: FileMetadataFilter | dict | None = None, limit: int = 25) FileMetadataList

Search for files. Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.

Parameters
  • name (str | None) – Prefix and fuzzy search on name.

  • filter (FileMetadataFilter | dict | None) – Filter to apply. Performs exact match on these fields.

  • limit (int) – Max number of results to return.

Returns

List of requested files metadata.

Return type

FileMetadataList

Examples

Search for a file:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.search(name="some name")

Search for an asset with an attached label:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_label_filter = LabelFilter(contains_all=["WELL LOG"])
>>> res = c.assets.search(name="xyz",filter=FileMetadataFilter(labels=my_label_filter))

Create file metadata

FilesAPI.create(file_metadata: FileMetadata, overwrite: bool = False) tuple[FileMetadata, str]

Create file without uploading content.

Parameters
  • file_metadata (FileMetadata) – File metadata for the file to create.

  • overwrite (bool) – If ‘overwrite’ is set to true, and the POST body content specifies a ‘externalId’ field, fields for the file found for externalId can be overwritten. The default setting is false. If metadata is included in the request body, all of the original metadata will be overwritten. File-Asset mappings only change if explicitly stated in the assetIds field of the POST json body. Do not set assetIds in request body if you want to keep the current file-asset mappings.

Returns

Tuple containing the file metadata and upload url of the created file.

Return type

tuple[FileMetadata, str]

Examples

Create a file:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import FileMetadata
>>> c = CogniteClient()
>>> file_metadata = FileMetadata(name="MyFile")
>>> res = c.files.create(file_metadata)

Upload a file or directory

FilesAPI.upload(path: str, external_id: str | None = None, name: str | None = None, source: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, directory: str | None = None, asset_ids: Sequence[int] | None = None, source_created_time: int | None = None, source_modified_time: int | None = None, data_set_id: int | None = None, labels: Sequence[Label] | None = None, geo_location: GeoLocation | None = None, security_categories: Sequence[int] | None = None, recursive: bool = False, overwrite: bool = False) FileMetadata | FileMetadataList

Upload a file

Parameters
  • path (str) – Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory.

  • external_id (str | None) – The external ID provided by the client. Must be unique within the project.

  • name (str | None) – Name of the file.

  • source (str | None) – The source of the file.

  • mime_type (str | None) – File type. E.g. text/plain, application/pdf, …

  • metadata (dict[str, str] | None) – Customizable extra data about the file. String key -> String value.

  • directory (str | None) – The directory to be associated with this file. Must be an absolute, unix-style path.

  • asset_ids (Sequence[int] | None) – No description.

  • source_created_time (int | None) – The timestamp for when the file was originally created in the source system.

  • source_modified_time (int | None) – The timestamp for when the file was last modified in the source system.

  • data_set_id (int | None) – ID of the data set.

  • labels (Sequence[Label] | None) – A list of the labels associated with this resource item.

  • geo_location (GeoLocation | None) – The geographic metadata of the file.

  • security_categories (Sequence[int] | None) – Security categories to attach to this file.

  • recursive (bool) – If path is a directory, upload all contained files recursively.

  • overwrite (bool) – If ‘overwrite’ is set to true, and the POST body content specifies a ‘externalId’ field, fields for the file found for externalId can be overwritten. The default setting is false. If metadata is included in the request body, all of the original metadata will be overwritten. The actual file will be overwritten after successful upload. If there is no successful upload, the current file contents will be kept. File-Asset mappings only change if explicitly stated in the assetIds field of the POST json body. Do not set assetIds in request body if you want to keep the current file-asset mappings.

Returns

The file metadata of the uploaded file(s).

Return type

FileMetadata | FileMetadataList

Examples

Upload a file in a given path:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.upload("/path/to/file", name="my_file")

If name is omitted, this method will use the name of the file

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.upload("/path/to/file")

You can also upload all files in a directory by setting path to the path of a directory:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.upload("/path/to/my/directory")

Upload a file with a label:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Label
>>> c = CogniteClient()
>>> res = c.files.upload("/path/to/file", name="my_file", labels=[Label(external_id="WELL LOG")])

Upload a file with a geo_location:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import GeoLocation, Geometry
>>> c = CogniteClient()
>>> geometry = Geometry(type="LineString", coordinates=[[30, 10], [10, 30], [40, 40]])
>>> res = c.files.upload("/path/to/file", geo_location=GeoLocation(type="Feature", geometry=geometry))

Upload a string or bytes

FilesAPI.upload_bytes(content: str | bytes | TextIO | BinaryIO, name: str, external_id: str | None = None, source: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, directory: str | None = None, asset_ids: Sequence[int] | None = None, data_set_id: int | None = None, labels: Sequence[Label] | None = None, geo_location: GeoLocation | None = None, source_created_time: int | None = None, source_modified_time: int | None = None, security_categories: Sequence[int] | None = None, overwrite: bool = False) FileMetadata

Upload bytes or string.

You can also pass a file handle to content.

Parameters
  • content (str | bytes | TextIO | BinaryIO) – The content to upload.

  • name (str) – Name of the file.

  • external_id (str | None) – The external ID provided by the client. Must be unique within the project.

  • source (str | None) – The source of the file.

  • mime_type (str | None) – File type. E.g. text/plain, application/pdf,…

  • metadata (dict[str, str] | None) – Customizable extra data about the file. String key -> String value.

  • directory (str | None) – The directory to be associated with this file. Must be an absolute, unix-style path.

  • asset_ids (Sequence[int] | None) – No description.

  • data_set_id (int | None) – Id of the data set.

  • labels (Sequence[Label] | None) – A list of the labels associated with this resource item.

  • geo_location (GeoLocation | None) – The geographic metadata of the file.

  • source_created_time (int | None) – The timestamp for when the file was originally created in the source system.

  • source_modified_time (int | None) – The timestamp for when the file was last modified in the source system.

  • security_categories (Sequence[int] | None) – Security categories to attach to this file.

  • overwrite (bool) – If ‘overwrite’ is set to true, and the POST body content specifies a ‘externalId’ field, fields for the file found for externalId can be overwritten. The default setting is false. If metadata is included in the request body, all of the original metadata will be overwritten. The actual file will be overwritten after successful upload. If there is no successful upload, the current file contents will be kept. File-Asset mappings only change if explicitly stated in the assetIds field of the POST json body. Do not set assetIds in request body if you want to keep the current file-asset mappings.

Returns

No description.

Return type

FileMetadata

Examples

Upload a file from memory:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.files.upload_bytes(b"some content", name="my_file", asset_ids=[1,2,3])

Retrieve download urls

FilesAPI.retrieve_download_urls(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, extended_expiration: bool = False) dict[int | str, str]

Get download links by id or external id

Parameters
  • id (int | Sequence[int] | None) – Id or list of ids.

  • external_id (str | Sequence[str] | None) – External id or list of external ids.

  • extended_expiration (bool) – Extend expiration time of download url to 1 hour. Defaults to false.

Returns

Dictionary containing download urls.

Return type

dict[int | str, str]

Download files to disk

FilesAPI.download(directory: str | Path, id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, keep_directory_structure: bool = False, resolve_duplicate_file_names: bool = False) None

Download files by id or external id.

This method will stream all files to disk, never keeping more than 2MB in memory per worker. The files will be stored in the provided directory using the file name retrieved from the file metadata in CDF. You can also choose to keep the directory structure from CDF so that the files will be stored in subdirectories matching the directory attribute on the files. When missing, the (root) directory is used. By default, duplicate file names to the same local folder will be resolved by only keeping one of the files. You can choose to resolve this by appending a number to the file name using the resolve_duplicate_file_names argument.

Warning

If you are downloading several files at once, be aware that file name collisions lead to all-but-one of the files missing. A warning is issued when this happens, listing the affected files.

Parameters
  • directory (str | Path) – Directory to download the file(s) to.

  • id (int | Sequence[int] | None) – Id or list of ids

  • external_id (str | Sequence[str] | None) – External ID or list of external ids.

  • keep_directory_structure (bool) – Whether or not to keep the directory hierarchy in CDF, creating subdirectories as needed below the given directory.

  • resolve_duplicate_file_names (bool) – Whether or not to resolve duplicate file names by appending a number on duplicate file names

Examples

Download files by id and external id into directory ‘my_directory’:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.files.download(directory="my_directory", id=[1,2,3], external_id=["abc", "def"])

Download files by id to the current directory:

>>> c.files.download(directory=".", id=[1,2,3])

Download a single file to a specific path

FilesAPI.download_to_path(path: Path | str, id: int | None = None, external_id: str | None = None) None

Download a file to a specific target.

Parameters
  • path (Path | str) – The path in which to place the file.

  • id (int | None) – Id of of the file to download.

  • external_id (str | None) – External id of the file to download.

Examples

Download a file by id:
>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.files.download_to_path("~/mydir/my_downloaded_file.txt", id=123)

Download a file as bytes

FilesAPI.download_bytes(id: int | None = None, external_id: str | None = None) bytes

Download a file as bytes.

Parameters
  • id (int | None) – Id of the file

  • external_id (str | None) – External id of the file

Examples

Download a file’s content into memory:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> file_content = c.files.download_bytes(id=1)
Returns

No description.

Return type

bytes

Delete files

FilesAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None) None

Delete files

Parameters
  • id (int | Sequence[int] | None) – Id or list of ids

  • external_id (str | Sequence[str] | None) – str or list of str

Examples

Delete files by id or external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.files.delete(id=[1,2,3], external_id="3")

Update files metadata

FilesAPI.update(item: FileMetadata | FileMetadataUpdate) FileMetadata
FilesAPI.update(item: Sequence[FileMetadata | FileMetadataUpdate]) FileMetadataList

Update files Currently, a full replacement of labels on a file is not supported (only partial add/remove updates). See the example below on how to perform partial labels update.

Parameters

item (FileMetadata | FileMetadataUpdate | Sequence[FileMetadata | FileMetadataUpdate]) – file(s) to update.

Returns

The updated files.

Return type

FileMetadata | FileMetadataList

Examples

Update file metadata that you have fetched. This will perform a full update of the file metadata:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> file_metadata = c.files.retrieve(id=1)
>>> file_metadata.description = "New description"
>>> res = c.files.update(file_metadata)

Perform a partial update on file metadata, updating the source and adding a new field to metadata:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import FileMetadataUpdate
>>> c = CogniteClient()
>>> my_update = FileMetadataUpdate(id=1).source.set("new source").metadata.add({"key": "value"})
>>> res = c.files.update(my_update)

Attach labels to a files:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import FileMetadataUpdate
>>> c = CogniteClient()
>>> my_update = FileMetadataUpdate(id=1).labels.add(["PUMP", "VERIFIED"])
>>> res = c.files.update(my_update)

Detach a single label from a file:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import FileMetadataUpdate
>>> c = CogniteClient()
>>> my_update = FileMetadataUpdate(id=1).labels.remove("PUMP")
>>> res = c.files.update(my_update)

Files Data classes

class cognite.client.data_classes.files.FileAggregate(count: int | None = None, **kwargs: Any)

Bases: dict

Aggregation results for files

Parameters
  • count (int | None) – Number of filtered items included in aggregation

  • **kwargs (Any) – No description.

class cognite.client.data_classes.files.FileMetadata(external_id: str | None = None, name: str | None = None, source: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, directory: str | None = None, asset_ids: Sequence[int] | None = None, data_set_id: int | None = None, labels: Sequence[Label] | None = None, geo_location: GeoLocation | None = None, source_created_time: int | None = None, source_modified_time: int | None = None, security_categories: Sequence[int] | None = None, id: int | None = None, uploaded: bool | None = None, uploaded_time: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

No description.

Parameters
  • external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.

  • name (str | None) – Name of the file.

  • source (str | None) – The source of the file.

  • mime_type (str | None) – File type. E.g. text/plain, application/pdf, ..

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.

  • directory (str | None) – Directory associated with the file. Must be an absolute, unix-style path.

  • asset_ids (Sequence[int] | None) – No description.

  • data_set_id (int | None) – The dataSet Id for the item.

  • labels (Sequence[Label] | None) – A list of the labels associated with this resource item.

  • geo_location (GeoLocation | None) – The geographic metadata of the file.

  • source_created_time (int | None) – The timestamp for when the file was originally created in the source system.

  • source_modified_time (int | None) – The timestamp for when the file was last modified in the source system.

  • security_categories (Sequence[int] | None) – The security category IDs required to access this file.

  • id (int | None) – A server-generated ID for the object.

  • uploaded (bool | None) – Whether or not the actual file is uploaded. This field is returned only by the API, it has no effect in a post body.

  • uploaded_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • cognite_client (CogniteClient | None) – The client to associate with this object.

class cognite.client.data_classes.files.FileMetadataFilter(name: str | None = None, mime_type: str | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, labels: LabelFilter | None = None, geo_location: GeoLocationFilter | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, source: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, uploaded_time: dict[str, Any] | TimestampRange | None = None, source_created_time: dict[str, Any] | None = None, source_modified_time: dict[str, Any] | None = None, external_id_prefix: str | None = None, directory_prefix: str | None = None, uploaded: bool | None = None)

Bases: CogniteFilter

No description.

Parameters
  • name (str | None) – Name of the file.

  • mime_type (str | None) – File type. E.g. text/plain, application/pdf, ..

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.

  • asset_ids (Sequence[int] | None) – Only include files that reference these specific asset IDs.

  • asset_external_ids (Sequence[str] | None) – Only include files that reference these specific asset external IDs.

  • data_set_ids (Sequence[dict[str, Any]] | None) – Only include files that belong to these datasets.

  • labels (LabelFilter | None) – Return only the files matching the specified label(s).

  • geo_location (GeoLocationFilter | None) – Only include files matching the specified geographic relation.

  • asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include files that have a related asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.

  • source (str | None) – The source of this event.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • uploaded_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • source_created_time (dict[str, Any] | None) – Filter for files where the sourceCreatedTime field has been set and is within the specified range.

  • source_modified_time (dict[str, Any] | None) – Filter for files where the sourceModifiedTime field has been set and is within the specified range.

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

  • directory_prefix (str | None) – Filter by this (case-sensitive) prefix for the directory provided by the client.

  • uploaded (bool | None) – Whether or not the actual file is uploaded. This field is returned only by the API, it has no effect in a post body.

dump(camel_case: bool = False) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.files.FileMetadataList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[FileMetadata], IdTransformerMixin

class cognite.client.data_classes.files.FileMetadataUpdate(id: int | None = None, external_id: str | None = None)

Bases: CogniteUpdate

Changes will be applied to file.

Args:

Geospatial

Note

Check https://github.com/cognitedata/geospatial-examples for some complete examples.

Create feature types

GeospatialAPI.create_feature_types(feature_type: FeatureType) FeatureType
GeospatialAPI.create_feature_types(feature_type: Sequence[FeatureType]) FeatureTypeList

Creates feature types <https://developer.cognite.com/api#tag/Geospatial/operation/createFeatureTypes>

Parameters

feature_type (FeatureType | Sequence[FeatureType]) – feature type definition or list of feature type definitions to create.

Returns

Created feature type definition(s)

Return type

FeatureType | FeatureTypeList

Examples

Create new type definitions:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.geospatial import FeatureType
>>> c = CogniteClient()
>>> feature_types = [
...     FeatureType(external_id="wells", properties={"location": {"type": "POINT", "srid": 4326}})
...     FeatureType(
...       external_id="cities",
...       properties={"name": {"type": "STRING", "size": 10}},
...       search_spec={"name_index": {"properties": ["name"]}}
...     )
... ]
>>> res = c.geospatial.create_feature_types(feature_types)

Delete feature types

GeospatialAPI.delete_feature_types(external_id: str | Sequence[str], recursive: bool = False) None

Delete one or more feature type <https://developer.cognite.com/api#tag/Geospatial/operation/GeospatialDeleteFeatureTypes>

Parameters
  • external_id (str | Sequence[str]) – External ID or list of external ids

  • recursive (bool) – if true the features will also be dropped

Examples

Delete feature type definitions external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.geospatial.delete_feature_types(external_id=["wells", "cities"])

List feature types

GeospatialAPI.list_feature_types() FeatureTypeList

List feature types <https://developer.cognite.com/api#tag/Geospatial/operation/listFeatureTypes>

Returns

List of feature types

Return type

FeatureTypeList

Examples

Iterate over feature type definitions:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for feature_type in c.geospatial.list_feature_types():
...     feature_type # do something with the feature type definition

Retrieve feature types

GeospatialAPI.retrieve_feature_types(external_id: str) FeatureType
GeospatialAPI.retrieve_feature_types(external_id: list[str]) FeatureTypeList

Retrieve feature types <https://developer.cognite.com/api#tag/Geospatial/operation/getFeatureTypesByIds>

Parameters

external_id (str | list[str]) – External ID

Returns

Requested Type or None if it does not exist.

Return type

FeatureType | FeatureTypeList

Examples

Get Type by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.geospatial.retrieve_feature_types(external_id="1")

Update feature types

GeospatialAPI.patch_feature_types(patch: FeatureTypePatch | Sequence[FeatureTypePatch]) FeatureTypeList

Patch feature types <https://developer.cognite.com/api#tag/Geospatial/operation/updateFeatureTypes>

Parameters

patch (FeatureTypePatch | Sequence[FeatureTypePatch]) – the patch to apply

Returns

The patched feature types.

Return type

FeatureTypeList

Examples

Add one property to a feature type and add indexes

>>> from cognite.client.data_classes.geospatial import Patches
>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.geospatial.patch_feature_types(
...    patch=FeatureTypePatch(
...       external_id="wells",
...       property_patches=Patches(add={"altitude": {"type": "DOUBLE"}}),
...       search_spec_patches=Patches(
...         add={
...           "altitude_idx": {"properties": ["altitude"]},
...           "composite_idx": {"properties": ["location", "altitude"]}
...         }
...       )
...    )
... )

Add an additional index to an existing property

>>> from cognite.client.data_classes.geospatial import Patches
>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.geospatial.patch_feature_types(
...    patch=FeatureTypePatch(
...         external_id="wells",
...         search_spec_patches=Patches(add={"location_idx": {"properties": ["location"]}})
... ))

Create features

GeospatialAPI.create_features(feature_type_external_id: str, feature: Feature, allow_crs_transformation: bool = False, chunk_size: int | None = None) Feature
GeospatialAPI.create_features(feature_type_external_id: str, feature: Sequence[Feature] | FeatureList, allow_crs_transformation: bool = False, chunk_size: int | None = None) FeatureList

Creates features <https://developer.cognite.com/api#tag/Geospatial/operation/createFeatures>

Parameters
  • feature_type_external_id (str) – Feature type definition for the features to create.

  • feature (Feature | Sequence[Feature] | FeatureList) – one feature or a list of features to create or a FeatureList object

  • allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.

  • chunk_size (int | None) – maximum number of items in a single request to the api

Returns

Created features

Return type

Feature | FeatureList

Examples

Create a new feature type and corresponding feature:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> feature_types = [
...     FeatureType(
...         external_id="my_feature_type",
...         properties={
...             "location": {"type": "POINT", "srid": 4326},
...             "temperature": {"type": "DOUBLE"}
...         }
...     )
... ]
>>> res = c.geospatial.create_feature_types(feature_types)
>>> res = c.geospatial.create_features(
...     feature_type_external_id="my_feature_type",
...     feature=Feature(
...         external_id="my_feature",
...         location={"wkt": "POINT(1 1)"},
...         temperature=12.4
...     )
... )

Delete features

GeospatialAPI.delete_features(feature_type_external_id: str, external_id: str | Sequence[str] | None = None) None

Delete one or more feature <https://developer.cognite.com/api#tag/Geospatial/operation/deleteFeatures>

Parameters
  • feature_type_external_id (str) – No description.

  • external_id (str | Sequence[str] | None) – External ID or list of external ids

Examples

Delete feature type definitions external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.geospatial.delete_features(
...     feature_type_external_id="my_feature_type",
...     external_id=my_feature
... )

Retrieve features

GeospatialAPI.retrieve_features(feature_type_external_id: str, external_id: str, properties: dict[str, Any] | None = None) Feature
GeospatialAPI.retrieve_features(feature_type_external_id: str, external_id: list[str], properties: dict[str, Any] | None = None) FeatureList

Retrieve features <https://developer.cognite.com/api#tag/Geospatial/operation/getFeaturesByIds>

Parameters
  • feature_type_external_id (str) – No description.

  • external_id (str | list[str]) – External ID or list of external ids

  • properties (dict[str, Any] | None) – the output property selection

Returns

Requested features or None if it does not exist.

Return type

FeatureList | Feature

Examples

Retrieve one feature by its external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.geospatial.retrieve_features(
...     feature_type_external_id="my_feature_type",
...     external_id="my_feature"
... )

Update features

GeospatialAPI.update_features(feature_type_external_id: str, feature: Feature | Sequence[Feature], allow_crs_transformation: bool = False, chunk_size: int | None = None) FeatureList

Update features <https://developer.cognite.com/api#tag/Geospatial/operation/updateFeatures>

Parameters
  • feature_type_external_id (str) – No description.

  • feature (Feature | Sequence[Feature]) – feature or list of features.

  • allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.

  • chunk_size (int | None) – maximum number of items in a single request to the api

Returns

Updated features

Return type

FeatureList

Examples

Update one feature:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_feature = c.geospatial.create_features(
...     feature_type_external_id="my_feature_type",
...     feature=Feature(external_id="my_feature", temperature=12.4)
... )
>>> my_updated_feature = c.geospatial.update_features(
...     feature_type_external_id="my_feature_type",
...     feature=Feature(external_id="my_feature", temperature=6.237)
... )

List features

GeospatialAPI.list_features(feature_type_external_id: str, filter: dict[str, Any] | None = None, properties: dict[str, Any] | None = None, limit: int | None = 25, allow_crs_transformation: bool = False) FeatureList

List features <https://developer.cognite.com/api#tag/Geospatial/operation/listFeatures>

This method allows to filter all features.

Parameters
  • feature_type_external_id (str) – the feature type to list features for

  • filter (dict[str, Any] | None) – the list filter

  • properties (dict[str, Any] | None) – the output property selection

  • limit (int | None) – Maximum number of features to return. Defaults to 25. Set to -1, float(“inf”) or None to return all features.

  • allow_crs_transformation (bool) – If true, then input geometries if existing in the filter will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.

Returns

The filtered features

Return type

FeatureList

Examples

List features:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_feature_type = c.geospatial.retrieve_feature_types(
...     external_id="my_feature_type"
... )
>>> my_feature = c.geospatial.create_features(
...     feature_type_external_id=my_feature_type,
...     feature=Feature(
...         external_id="my_feature",
...         temperature=12.4,
...         location={"wkt": "POINT(0 1)"}
...     )
... )
>>> res = c.geospatial.list_features(
...     feature_type_external_id="my_feature_type",
...     filter={"range": {"property": "temperature", "gt": 12.0}}
... )
>>> for f in res:
...     # do something with the features

Search for features and select output properties:

>>> res = c.geospatial.list_features(
...     feature_type_external_id=my_feature_type,
...     filter={},
...     properties={"temperature": {}, "pressure": {}}
... )

Search for features with spatial filters:

>>> res = c.geospatial.list_features(
...     feature_type_external_id=my_feature_type,
...     filter={"stWithin": {
...         "property": "location",
...         "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"}
...     }}
... )

Search features

GeospatialAPI.search_features(feature_type_external_id: str, filter: dict[str, Any] | None = None, properties: dict[str, Any] | None = None, limit: int = 25, order_by: Sequence[OrderSpec] | None = None, allow_crs_transformation: bool = False) FeatureList

Search for features <https://developer.cognite.com/api#tag/Geospatial/operation/searchFeatures>

This method allows to order the result by one or more of the properties of the feature type. However, the number of items returned is limited to 1000 and there is no support for cursors yet. If you need to return more than 1000 items, use the stream_features(…) method instead.

Parameters
  • feature_type_external_id (str) – The feature type to search for

  • filter (dict[str, Any] | None) – The search filter

  • properties (dict[str, Any] | None) – The output property selection

  • limit (int) – Maximum number of results

  • order_by (Sequence[OrderSpec] | None) – The order specification

  • allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.

Returns

the filtered features

Return type

FeatureList

Examples

Search for features:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_feature_type = c.geospatial.retrieve_feature_types(
...     external_id="my_feature_type"
... )
>>> my_feature = c.geospatial.create_features(
...     feature_type_external_id=my_feature_type,
...     feature=Feature(
...         external_id="my_feature",
...         temperature=12.4,
...         location={"wkt": "POINT(0 1)"}
...     )
... )
>>> res = c.geospatial.search_features(
...     feature_type_external_id="my_feature_type",
...     filter={"range": {"property": "temperature", "gt": 12.0}}
... )
>>> for f in res:
...     # do something with the features

Search for features and select output properties:

>>> res = c.geospatial.search_features(
...     feature_type_external_id=my_feature_type,
...     filter={},
...     properties={"temperature": {}, "pressure": {}}
... )

Search for features and order results:

>>> res = c.geospatial.search_features(
...     feature_type_external_id=my_feature_type,
...     filter={},
...     order_by=[
...         OrderSpec("temperature", "ASC"),
...         OrderSpec("pressure", "DESC")]
... )

Search for features with spatial filters:

>>> res = c.geospatial.search_features(
...     feature_type_external_id=my_feature_type,
...     filter={"stWithin": {
...         "property": "location",
...         "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"}
...     }}
... )

Combining multiple filters:

>>> res = c.geospatial.search_features(
...     feature_type_external_id=my_feature_type,
...     filter={"and": [
...         {"range": {"property": "temperature", "gt": 12.0}},
...         {"stWithin": {
...             "property": "location",
...             "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"}
...         }}
...     ]}
... )
>>> res = c.geospatial.search_features(
...     feature_type_external_id=my_feature_type,
...     filter={"or": [
...         {"range": {"property": "temperature", "gt": 12.0}},
...         {"stWithin": {
...             "property": "location",
...             "value": {"wkt": "POLYGON((0 0, 0 1, 1 1, 0 0))"}
...         }}
...     ]}
... )

Stream features

GeospatialAPI.stream_features(feature_type_external_id: str, filter: dict[str, Any] | None = None, properties: dict[str, Any] | None = None, allow_crs_transformation: bool = False) Iterator[Feature]

Stream features <https://developer.cognite.com/api#tag/Geospatial/operation/searchFeaturesStreaming>

This method allows to return any number of items until the underlying api calls times out. The order of the result items is not deterministic. If you need to order the results, use the search_features(…) method instead.

Parameters
  • feature_type_external_id (str) – the feature type to search for

  • filter (dict[str, Any] | None) – the search filter

  • properties (dict[str, Any] | None) – the output property selection

  • allow_crs_transformation (bool) – If true, then input geometries will be transformed into the Coordinate Reference System defined in the feature type specification. When it is false, then requests with geometries in Coordinate Reference System different from the ones defined in the feature type will result in CogniteAPIError exception.

Yields

Feature – a generator for the filtered features

Examples

Stream features:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_feature = c.geospatial.create_features(
...     feature_type_external_id="my_feature_type",
...     feature=Feature(external_id="my_feature", temperature=12.4)
... )
>>> features = c.geospatial.stream_features(
...     feature_type_external_id="my_feature_type",
...     filter={"range": {"property": "temperature", "gt": 12.0}}
... )
>>> for f in features:
...     # do something with the features

Stream features and select output properties:

>>> features = c.geospatial.stream_features(
...     feature_type_external_id="my_feature_type",
...     filter={},
...     properties={"temperature": {}, "pressure": {}}
... )
>>> for f in features:
...     # do something with the features

Aggregate features

GeospatialAPI.aggregate_features(feature_type_external_id: str, property: str | None = None, aggregates: Sequence[str] | None = None, filter: dict[str, Any] | None = None, group_by: Sequence[str] | None = None, order_by: Sequence[OrderSpec] | None = None, output: dict[str, Any] | None = None) FeatureAggregateList

Aggregate filtered features <https://developer.cognite.com/api#tag/Geospatial/operation/aggregateFeatures>

Parameters
  • feature_type_external_id (str) – the feature type to filter features from

  • property (str | None) – the property for which aggregates should be calculated

  • aggregates (Sequence[str] | None) – list of aggregates to be calculated

  • filter (dict[str, Any] | None) – the search filter

  • group_by (Sequence[str] | None) – list of properties to group by with

  • order_by (Sequence[OrderSpec] | None) – the order specification

  • output (dict[str, Any] | None) – the aggregate output

Returns

the filtered features

Return type

FeatureAggregateList

Examples

Aggregate property of features:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_feature = c.geospatial.create_features(
...     feature_type_external_id="my_feature_type",
...     feature=Feature(external_id="my_feature", temperature=12.4)
... )
>>> res_deprecated = c.geospatial.aggregate_features(
...     feature_type_external_id="my_feature_type",
...     filter={"range": {"property": "temperature", "gt": 12.0}},
...     property="temperature",
...     aggregates=["max", "min"],
...     group_by=["category"],
...     order_by=[OrderSpec("category", "ASC")]
... ) # deprecated
>>> res = c.geospatial.aggregate_features(
...     feature_type_external_id="my_feature_type",
...     filter={"range": {"property": "temperature", "gt": 12.0}},
...     group_by=["category"],
...     order_by=[OrderSpec("category", "ASC")],
...     output={"min_temperature": {"min": {"property": "temperature"}},
...         "max_volume": {"max": {"property": "volume"}}
...     }
... )
>>> for a in res:
...     # loop over aggregates in different groups

Get coordinate reference systems

GeospatialAPI.get_coordinate_reference_systems(srids: int | Sequence[int]) CoordinateReferenceSystemList

Get Coordinate Reference Systems <https://developer.cognite.com/api#tag/Geospatial/operation/getCoordinateReferenceSystem>

Parameters

srids (int | Sequence[int]) – (Union[int, Sequence[int]]): SRID or list of SRIDs

Returns

Requested CRSs.

Return type

CoordinateReferenceSystemList

Examples

Get two CRS definitions:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> crs = c.geospatial.get_coordinate_reference_systems(srids=[4326, 4327])

List coordinate reference systems

GeospatialAPI.list_coordinate_reference_systems(only_custom: bool = False) CoordinateReferenceSystemList

List Coordinate Reference Systems <https://developer.cognite.com/api#tag/Geospatial/operation/listGeospatialCoordinateReferenceSystems>

Parameters

only_custom (bool) – list only custom CRSs or not

Returns

list of CRSs.

Return type

CoordinateReferenceSystemList

Examples

Fetch all custom CRSs:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> crs = c.geospatial.list_coordinate_reference_systems(only_custom=True)

Create coordinate reference systems

GeospatialAPI.create_coordinate_reference_systems(crs: CoordinateReferenceSystem | Sequence[CoordinateReferenceSystem]) CoordinateReferenceSystemList

Create Coordinate Reference System <https://developer.cognite.com/api#tag/Geospatial/operation/createGeospatialCoordinateReferenceSystems>

Parameters

crs (CoordinateReferenceSystem | Sequence[CoordinateReferenceSystem]) – a CoordinateReferenceSystem or a list of CoordinateReferenceSystem

Returns

list of CRSs.

Return type

CoordinateReferenceSystemList

Examples

Create a custom CRS:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> custom_crs = CoordinateReferenceSystem(
...     srid = 121111,
...     wkt=(
...          'PROJCS["NTF (Paris) / Lambert zone II",'
...          ' GEOGCS["NTF (Paris)",'
...          '  DATUM["Nouvelle_Triangulation_Francaise_Paris",'
...          '   SPHEROID["Clarke 1880 (IGN)",6378249.2,293.4660212936265,'
...          '    AUTHORITY["EPSG","7011"]],'
...          '   TOWGS84[-168,-60,320,0,0,0,0],'
...          '   AUTHORITY["EPSG","6807"]],'
...          '  PRIMEM["Paris",2.33722917,'
...          '   AUTHORITY["EPSG","8903"]],'
...          '  UNIT["grad",0.01570796326794897,'
...          '   AUTHORITY["EPSG","9105"]], '
...          '  AUTHORITY["EPSG","4807"]],'
...          ' PROJECTION["Lambert_Conformal_Conic_1SP"],'
...          ' PARAMETER["latitude_of_origin",52],'
...          ' PARAMETER["central_meridian",0],'
...          ' PARAMETER["scale_factor",0.99987742],'
...          ' PARAMETER["false_easting",600000],'
...          ' PARAMETER["false_northing",2200000],'
...          ' UNIT["metre",1,'
...          '  AUTHORITY["EPSG","9001"]],'
...          ' AXIS["X",EAST],'
...          ' AXIS["Y",NORTH],'
...          ' AUTHORITY["EPSG","27572"]]'
...     ),
...     proj_string=(
...          '+proj=lcc +lat_1=46.8 +lat_0=46.8 +lon_0=0 +k_0=0.99987742 '
...          '+x_0=600000 +y_0=2200000 +a=6378249.2 +b=6356515 '
...          '+towgs84=-168,-60,320,0,0,0,0 +pm=paris +units=m +no_defs'
...     )
... )
>>> crs = c.geospatial.create_coordinate_reference_systems(custom_crs)

Put raster data

GeospatialAPI.put_raster(feature_type_external_id: str, feature_external_id: str, raster_property_name: str, raster_format: str, raster_srid: int, file: str, allow_crs_transformation: bool = False, raster_scale_x: float | None = None, raster_scale_y: float | None = None) RasterMetadata

Put raster <https://developer.cognite.com/api#tag/Geospatial/operation/putRaster>

Parameters
  • feature_type_external_id (str) – No description.

  • feature_external_id (str) – one feature or a list of features to create

  • raster_property_name (str) – the raster property name

  • raster_format (str) – the raster input format

  • raster_srid (int) – the associated SRID for the raster

  • file (str) – the path to the file of the raster

  • allow_crs_transformation (bool) – When the parameter is false, requests with rasters in Coordinate Reference System different from the one defined in the feature type will result in bad request response code.

  • raster_scale_x (float | None) – the X component of the pixel width in units of coordinate reference system

  • raster_scale_y (float | None) – the Y component of the pixel height in units of coordinate reference system

Returns

the raster metadata if it was ingested successfully

Return type

RasterMetadata

Examples

Put a raster in a feature raster property:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> feature_type = ...
>>> feature = ...
>>> raster_property_name = ...
>>> metadata = c.geospatial.put_raster(feature_type.external_id, feature.external_id,
...         raster_property_name, "XYZ", 3857, file)

Delete raster data

GeospatialAPI.delete_raster(feature_type_external_id: str, feature_external_id: str, raster_property_name: str) None

Delete raster <https://developer.cognite.com/api#tag/Geospatial/operation/deleteRaster>

Parameters
  • feature_type_external_id (str) – No description.

  • feature_external_id (str) – one feature or a list of features to create

  • raster_property_name (str) – the raster property name

Examples

Delete a raster in a feature raster property:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> feature_type = ...
>>> feature = ...
>>> raster_property_name = ...
>>> c.geospatial.delete_raster(feature_type.external_id, feature.external_id, raster_property_name)

Get raster data

GeospatialAPI.get_raster(feature_type_external_id: str, feature_external_id: str, raster_property_name: str, raster_format: str, raster_options: dict[str, Any] | None = None, raster_srid: int | None = None, raster_scale_x: float | None = None, raster_scale_y: float | None = None, allow_crs_transformation: bool = False) bytes

Get raster <https://developer.cognite.com/api#tag/Geospatial/operation/getRaster>

Parameters
  • feature_type_external_id (str) – Feature type definition for the features to create.

  • feature_external_id (str) – one feature or a list of features to create

  • raster_property_name (str) – the raster property name

  • raster_format (str) – the raster output format

  • raster_options (dict[str, Any] | None) – GDAL raster creation key-value options

  • raster_srid (int | None) – the SRID for the output raster

  • raster_scale_x (float | None) – the X component of the output pixel width in units of coordinate reference system

  • raster_scale_y (float | None) – the Y component of the output pixel height in units of coordinate reference system

  • allow_crs_transformation (bool) – When the parameter is false, requests with output rasters in Coordinate Reference System different from the one defined in the feature type will result in bad request response code.

Returns

the raster data

Return type

bytes

Examples

Get a raster from a feature raster property:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> feature_type = ...
>>> feature = ...
>>> raster_property_name = ...
>>> raster_data = c.geospatial.get_raster(feature_type.external_id, feature.external_id,
...    raster_property_name, "XYZ", {"SIGNIFICANT_DIGITS": "4"})

Compute

GeospatialAPI.compute(output: dict[str, GeospatialComputeFunction]) GeospatialComputedResponse

Compute <https://developer.cognite.com/api#tag/Geospatial/operation/compute>

Parameters

output (dict[str, GeospatialComputeFunction]) – No description.

Returns

Mapping of keys to computed items.

Return type

GeospatialComputedResponse

Examples

Compute the transformation of an ewkt geometry from one SRID to another:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.geospatial import GeospatialGeometryTransformComputeFunction, GeospatialGeometryValueComputeFunction
>>> c = CogniteClient()
>>> compute_function = GeospatialGeometryTransformComputeFunction(GeospatialGeometryValueComputeFunction("SRID=4326;POLYGON((0 0,10 0,10 10,0 10,0 0))"), srid=23031)
>>> compute_result = c.geospatial.compute(output = {"output": compute_function})

Geospatial Data classes

class cognite.client.data_classes.geospatial.CoordinateReferenceSystem(srid: int | None = None, wkt: str | None = None, proj_string: str | None = None, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

A representation of a feature in the geospatial api.

class cognite.client.data_classes.geospatial.CoordinateReferenceSystemList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[CoordinateReferenceSystem]

class cognite.client.data_classes.geospatial.Feature(external_id: str | None = None, cognite_client: CogniteClient | None = None, **properties: Any)

Bases: CogniteResource

A representation of a feature in the geospatial api.

dump(camel_case: bool = False) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.geospatial.FeatureAggregate(cognite_client: CogniteClient | None = None)

Bases: CogniteResource

A result of aggregating features in geospatial api.

class cognite.client.data_classes.geospatial.FeatureAggregateList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[FeatureAggregate]

class cognite.client.data_classes.geospatial.FeatureList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[Feature]

static from_geopandas(feature_type: FeatureType, geodataframe: geopandas.GeoDataFrame, external_id_column: str = 'externalId', property_column_mapping: dict[str, str] | None = None, data_set_id_column: str = 'dataSetId') FeatureList

Convert a GeoDataFrame instance into a FeatureList.

Parameters
  • feature_type (FeatureType) – The feature type the features will conform to

  • geodataframe (geopandas.GeoDataFrame) – the geodataframe instance to convert into features

  • external_id_column (str) – the geodataframe column to use for the feature external id

  • property_column_mapping (dict[str, str] | None) – provides a mapping from featuretype property names to geodataframe columns

  • data_set_id_column (str) – the geodataframe column to use for the feature dataSet id

Returns

The list of features converted from the geodataframe rows.

Return type

FeatureList

Examples

Create features from a geopandas dataframe:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> my_feature_type = ... # some feature type with 'position' and 'temperature' properties
>>> my_geodataframe = ...  # some geodataframe with 'center_xy', 'temp' and 'id' columns
>>> feature_list = FeatureList.from_geopandas(feature_type=my_feature_type, geodataframe=my_geodataframe,
>>>     external_id_column="id", data_set_id_column="dataSetId",
>>>     property_column_mapping={'position': 'center_xy', 'temperature': 'temp'})
>>> created_features = c.geospatial.create_features(my_feature_type.external_id, feature_list)
to_geopandas(geometry: str, camel_case: bool = False) geopandas.GeoDataFrame

Convert the instance into a GeoPandas GeoDataFrame.

Parameters
  • geometry (str) – The name of the feature type geometry property to use in the GeoDataFrame

  • camel_case (bool) – Convert column names to camel case (e.g. externalId instead of external_id)

Returns

The GeoPandas GeoDataFrame.

Return type

geopandas.GeoDataFrame

Examples

Convert a FeatureList into a GeoPandas GeoDataFrame:

>>> from cognite.client.data_classes.geospatial import PropertyAndSearchSpec
>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> features = c.geospatial.search_features(...)
>>> gdf = features.to_geopandas(
...     geometry="position",
...     camel_case=False
... )
>>> gdf.head()
class cognite.client.data_classes.geospatial.FeatureType(external_id: str | None = None, data_set_id: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, properties: dict[str, Any] | None = None, search_spec: dict[str, Any] | None = None, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

A representation of a feature type in the geospatial api.

class cognite.client.data_classes.geospatial.FeatureTypeList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[FeatureType]

class cognite.client.data_classes.geospatial.FeatureTypePatch(external_id: 'str | None' = None, property_patches: 'Patches | None' = None, search_spec_patches: 'Patches | None' = None)

Bases: object

class cognite.client.data_classes.geospatial.FeatureTypeUpdate(external_id: str | None = None, add: PropertyAndSearchSpec | None = None, remove: PropertyAndSearchSpec | None = None, cognite_client: CogniteClient | None = None)

Bases: object

A representation of a feature type update in the geospatial api.

class cognite.client.data_classes.geospatial.GeospatialComputeFunction

Bases: ABC

A geospatial compute function

abstract to_json_payload() dict

Convert function to json for request payload

class cognite.client.data_classes.geospatial.GeospatialComputedItem(resource: dict[str, Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResource

A representation of an item computed from geospatial.

class cognite.client.data_classes.geospatial.GeospatialComputedItemList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[GeospatialComputedItem]

A list of items computed from geospatial.

class cognite.client.data_classes.geospatial.GeospatialComputedResponse(computed_item_list: GeospatialComputedItemList, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

The geospatial compute response.

class cognite.client.data_classes.geospatial.GeospatialGeometryComputeFunction

Bases: GeospatialComputeFunction, ABC

A geospatial geometry compute function

class cognite.client.data_classes.geospatial.GeospatialGeometryTransformComputeFunction(geospatial_geometry_compute_function: GeospatialComputeFunction, srid: int)

Bases: GeospatialComputeFunction

A stTransform geospatial compute function

to_json_payload() dict

Convert function to json for request payload

class cognite.client.data_classes.geospatial.GeospatialGeometryValueComputeFunction(ewkt: str)

Bases: GeospatialGeometryComputeFunction

A geospatial geometry value compute function. Accepts a well-known text of the geometry prefixed with a spatial reference identifier, see https://docs.geotools.org/stable/javadocs/org/opengis/referencing/doc-files/WKT.html :param ewkt: No description. :type ewkt: str

to_json_payload() dict

Convert function to json for request payload

class cognite.client.data_classes.geospatial.OrderSpec(property: str, direction: str)

Bases: object

An order specification with respect to an property.

class cognite.client.data_classes.geospatial.Patches(add: 'dict[str, Any] | None' = None, remove: 'list[str] | None' = None)

Bases: object

class cognite.client.data_classes.geospatial.PropertyAndSearchSpec(properties: dict[str, Any] | list[str] | None = None, search_spec: dict[str, Any] | list[str] | None = None)

Bases: object

A representation of a feature type property and search spec.

class cognite.client.data_classes.geospatial.RasterMetadata(**properties: Any)

Bases: object

Raster metadata

cognite.client.data_classes.geospatial.nan_to_none(column_value: Any) Any

Convert NaN value to None.

Synthetic time series

Calculate the result of a function on time series

SyntheticDatapointsAPI.query(expressions: str | sympy.Expr | Sequence[str | sympy.Expr], start: int | str | datetime, end: int | str | datetime, limit: int | None = None, variables: dict[str, str | TimeSeries] | None = None, aggregate: str | None = None, granularity: str | None = None) Datapoints | DatapointsList

Calculate the result of a function on time series.

Parameters
  • expressions (str | sympy.Expr | Sequence[str | sympy.Expr]) – Functions to be calculated. Supports both strings and sympy expressions. Strings can have either the API ts{} syntax, or contain variable names to be replaced using the variables parameter.

  • start (int | str | datetime) – Inclusive start.

  • end (int | str | datetime) – Exclusive end

  • limit (int | None) – Number of datapoints per expression to retrieve.

  • variables (dict[str, str | TimeSeries] | None) – An optional map of symbol replacements.

  • aggregate (str | None) – use this aggregate when replacing entries from variables, does not affect time series given in the ts{} syntax.

  • granularity (str | None) – use this granularity with the aggregate.

Returns

A DatapointsList object containing the calculated data.

Return type

Datapoints | DatapointsList

Examples

Request a synthetic time series query with direct syntax

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> dps = c.time_series.data.synthetic.query(expressions="TS{id:123} + TS{externalId:'abc'}", start="2w-ago", end="now")

Use variables to re-use an expression:

>>> vars = {"A": "my_ts_external_id", "B": client.time_series.retrieve(id=1)}
>>> dps = c.time_series.data.synthetic.query(expressions="A+B", start="2w-ago", end="now", variables=vars)

Use sympy to build complex expressions:

>>> from sympy import symbols, cos, sin
>>> a = symbols('a')
>>> dps = c.time_series.data.synthetic.query([sin(a), cos(a)], start="2w-ago", end="now", variables={"a": "my_ts_external_id"}, aggregate='interpolation', granularity='1m')

Time series

Warning

TimeSeries unit support is a new feature:
  • The API specification is in beta.

  • The SDK implementation is in alpha.

Unit is implemented in the TimeSeries APIs with the parameters unit_external_id and unit_quantity in the methods below. It is only the use of these arguments that is in alpha. Using the methods below without these arguments is stable.

Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.

Retrieve a time series by id

TimeSeriesAPI.retrieve(id: int | None = None, external_id: str | None = None) TimeSeries | None

Retrieve a single time series by id.

Parameters
  • id (int | None) – ID

  • external_id (str | None) – External ID

Returns

Requested time series or None if it does not exist.

Return type

TimeSeries | None

Examples

Get time series by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.retrieve(id=1)

Get time series by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.retrieve(external_id="1")

Retrieve multiple time series by id

TimeSeriesAPI.retrieve_multiple(ids: Sequence[int] | None = None, external_ids: Sequence[str] | None = None, ignore_unknown_ids: bool = False) TimeSeriesList

Retrieve multiple time series by id.

Parameters
  • ids (Sequence[int] | None) – IDs

  • external_ids (Sequence[str] | None) – External IDs

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Returns

The requested time series.

Return type

TimeSeriesList

Examples

Get time series by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.retrieve_multiple(ids=[1, 2, 3])

Get time series by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.retrieve_multiple(external_ids=["abc", "def"])

List time series

TimeSeriesAPI.list(name: str | None = None, unit: str | None = None, unit_external_id: str | None = None, unit_quantity: str | None = None, is_string: bool | None = None, is_step: bool | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: int | Sequence[int] | None = None, asset_subtree_external_ids: str | Sequence[str] | None = None, data_set_ids: int | Sequence[int] | None = None, data_set_external_ids: str | Sequence[str] | None = None, metadata: dict[str, Any] | None = None, external_id_prefix: str | None = None, created_time: dict[str, Any] | None = None, last_updated_time: dict[str, Any] | None = None, partitions: int | None = None, limit: int | None = 25) TimeSeriesList

List over time series

Fetches time series as they are iterated over, so you keep a limited number of objects in memory.

Parameters
  • name (str | None) – Name of the time series. Often referred to as tag.

  • unit (str | None) – Unit of the time series.

  • unit_external_id (str | None) – Filter on unit external ID.

  • unit_quantity (str | None) – Filter on unit quantity.

  • is_string (bool | None) – Whether the time series is an string time series.

  • is_step (bool | None) – Whether the time series is a step (piecewise constant) time series.

  • asset_ids (Sequence[int] | None) – List time series related to these assets.

  • asset_external_ids (Sequence[str] | None) – List time series related to these assets.

  • asset_subtree_ids (int | Sequence[int] | None) – Asset subtree id or list of asset subtree ids to filter on.

  • asset_subtree_external_ids (str | Sequence[str] | None) – Asset external id or list of asset subtree external ids to filter on.

  • data_set_ids (int | Sequence[int] | None) – Return only time series in the specified data set(s) with this id / these ids.

  • data_set_external_ids (str | Sequence[str] | None) – Return only time series in the specified data set(s) with this external id / these external ids.

  • metadata (dict[str, Any] | None) – Custom, application specific metadata. String key -> String value

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

  • created_time (dict[str, Any] | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • last_updated_time (dict[str, Any] | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • partitions (int | None) – Retrieve time series in parallel using this number of workers. Also requires limit=None to be passed.

  • limit (int | None) – Maximum number of time series to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

The requested time series.

Return type

TimeSeriesList

Examples

List time series:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.list(limit=5)

Iterate over time series:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for ts in c.time_series:
...     ts # do something with the time_series

Iterate over chunks of time series to reduce memory load:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for ts_list in c.time_series(chunk_size=2500):
...     ts_list # do something with the time_series

Aggregate time series

TimeSeriesAPI.aggregate(filter: TimeSeriesFilter | dict | None = None) list[TimeSeriesAggregate]

Aggregate time series

Parameters

filter (TimeSeriesFilter | dict | None) – Filter on time series filter with exact match

Returns

List of sequence aggregates

Return type

list[TimeSeriesAggregate]

Examples

List time series:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.aggregate(filter={"unit": "kpa"})

Aggregate Time Series Count

TimeSeriesAPI.aggregate_count(advanced_filter: Filter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) int

Count of time series matching the specified filters and search.

Parameters
  • advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count.

  • filter (TimeSeriesFilter | dict | None) – The filter to narrow down time series to count requiring exact match.

Returns

The number of time series matching the specified filters and search.

Return type

int

Examples:

Count the number of time series in your CDF project:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> count = c.time_series.aggregate_count()

Count the number of numeric time series in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> c = CogniteClient()
>>> is_numeric = filters.Equals(TimeSeriesProperty.is_string, False)
>>> count = c.time_series.aggregate_count(advanced_filter=is_numeric)

Aggregate Time Series Values Cardinality

TimeSeriesAPI.aggregate_cardinality_values(property: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) int

Find approximate property count for time series.

Parameters
  • property (TimeSeriesProperty | str | list[str]) – The property to count the cardinality of.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.

Returns

The number of properties matching the specified filters and search.

Return type

int

Examples:

Count the number of different units used for time series in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> c = CogniteClient()
>>> unit_count = c.time_series.aggregate_cardinality_values(TimeSeriesProperty.unit)

Count the number of timezones (metadata key) for time series with the word “critical” in the description in your CDF project, but exclude timezones from america:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters, aggregations as aggs
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> c = CogniteClient()
>>> not_america = aggs.Not(aggs.Prefix("america"))
>>> is_critical = filters.Search(TimeSeriesProperty.description, "critical")
>>> timezone_count = c.time_series.aggregate_cardinality_values(
...     TimeSeriesProperty.metadata_key("timezone"),
...     advanced_filter=is_critical,
...     aggregate_filter=not_america)

Aggregate Time Series Property Cardinality

TimeSeriesAPI.aggregate_cardinality_properties(path: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) int

Find approximate paths count for time series.

Parameters
  • path (TimeSeriesProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.

Returns

The number of properties matching the specified filters and search.

Return type

int

Examples

Count the number of metadata keys in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> c = CogniteClient()
>>> key_count = c.time_series.aggregate_cardinality_properties(TimeSeriesProperty.metadata)

Aggregate Time Series Unique Values

TimeSeriesAPI.aggregate_unique_values(property: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) UniqueResultList

Get unique properties with counts for time series.

Parameters
  • property (TimeSeriesProperty | str | list[str]) – The property to group by.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.

Returns

List of unique values of time series matching the specified filters and search.

Return type

UniqueResultList

Examples

Get the timezones (metadata key) with count for your time series in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> c = CogniteClient()
>>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.metadata_key("timezone"))
>>> print(result.unique)

Get the different units with count used for time series created after 2020-01-01 in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> from cognite.client.utils import timestamp_to_ms
>>> from datetime import datetime
>>> c = CogniteClient()
>>> created_after_2020 = filters.Range(TimeSeriesProperty.created_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.unit, advanced_filter=created_after_2020)
>>> print(result.unique)

Get the different units with count for time series updated after 2020-01-01 in your CDF project, but exclude all units that start with “test”:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> from cognite.client.data_classes import aggregations as aggs, filters
>>> c = CogniteClient()
>>> not_test = aggs.Not(aggs.Prefix("test"))
>>> created_after_2020 = filters.Range(TimeSeriesProperty.last_updated_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.unit, advanced_filter=created_after_2020, aggregate_filter=not_test)
>>> print(result.unique)

Aggregate Time Series Unique Properties

TimeSeriesAPI.aggregate_unique_properties(path: TimeSeriesProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: TimeSeriesFilter | dict | None = None) UniqueResultList

Get unique paths with counts for time series.

Parameters
  • path (TimeSeriesProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The filter to narrow down the time series to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (TimeSeriesFilter | dict | None) – The filter to narrow down the time series to count requiring exact match.

Returns

List of unique values of time series matching the specified filters and search.

Return type

UniqueResultList

Examples

Get the metadata keys with count for your time series in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty
>>> c = CogniteClient()
>>> result = c.time_series.aggregate_unique_values(TimeSeriesProperty.metadata)

Search for time series

TimeSeriesAPI.search(name: str | None = None, description: str | None = None, query: str | None = None, filter: TimeSeriesFilter | dict | None = None, limit: int = 25) TimeSeriesList

Search for time series. Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.

Parameters
  • name (str | None) – Prefix and fuzzy search on name.

  • description (str | None) – Prefix and fuzzy search on description.

  • query (str | None) – Search on name and description using wildcard search on each of the words (separated by spaces). Retrieves results where at least one word must match. Example: ‘some other’

  • filter (TimeSeriesFilter | dict | None) – Filter to apply. Performs exact match on these fields.

  • limit (int) – Max number of results to return.

Returns

List of requested time series.

Return type

TimeSeriesList

Examples

Search for a time series:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.search(name="some name")

Search for all time series connected to asset with id 123:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.search(filter={"asset_ids":[123]})

Create time series

TimeSeriesAPI.create(time_series: Sequence[TimeSeries]) TimeSeriesList
TimeSeriesAPI.create(time_series: TimeSeries) TimeSeries

Create one or more time series.

Parameters

time_series (TimeSeries | Sequence[TimeSeries]) – TimeSeries or list of TimeSeries to create.

Returns

The created time series.

Return type

TimeSeries | TimeSeriesList

Examples

Create a new time series:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import TimeSeries
>>> c = CogniteClient()
>>> ts = c.time_series.create(TimeSeries(name="my_ts", data_set_id=123, external_id="foo"))

Delete time series

TimeSeriesAPI.delete(id: int | Sequence[int] | None = None, external_id: str | Sequence[str] | None = None, ignore_unknown_ids: bool = False) None

Delete one or more time series.

Parameters
  • id (int | Sequence[int] | None) – Id or list of ids

  • external_id (str | Sequence[str] | None) – External ID or list of external ids

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Examples

Delete time series by id or external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.time_series.delete(id=[1,2,3], external_id="3")

Filter time series

TimeSeriesAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, limit: int | None = 25) TimeSeriesList

Advanced filter time series

Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.

Parameters
  • filter (Filter | dict) – Filter to apply.

  • sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of time series that match the filter criteria.

Return type

TimeSeriesList

Examples

Find all numeric time series and return them sorted by external id:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> c = CogniteClient()
>>> f = filters
>>> is_numeric = f.Equals("is_string", False)
>>> res = c.time_series.filter(filter=is_numeric, sort="external_id")

Note that you can check the API documentation above to see which properties you can filter on with which filters.

To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the TimeSeriesProperty and SortableTimeSeriesProperty enums.

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.time_series import TimeSeriesProperty, SortableTimeSeriesProperty
>>> c = CogniteClient()
>>> f = filters
>>> is_numeric = f.Equals(TimeSeriesProperty.is_string, False)
>>> res = c.time_series.filter(filter=is_numeric, sort=SortableTimeSeriesProperty.external_id)

Update time series

TimeSeriesAPI.update(item: Sequence[TimeSeries | TimeSeriesUpdate]) TimeSeriesList
TimeSeriesAPI.update(item: TimeSeries | TimeSeriesUpdate) TimeSeries

Update one or more time series.

Parameters

item (TimeSeries | TimeSeriesUpdate | Sequence[TimeSeries | TimeSeriesUpdate]) – Time series to update

Returns

Updated time series.

Return type

TimeSeries | TimeSeriesList

Examples

Update a time series that you have fetched. This will perform a full update of the time series:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.retrieve(id=1)
>>> res.description = "New description"
>>> res = c.time_series.update(res)

Perform a partial update on a time series, updating the description and adding a new field to metadata:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import TimeSeriesUpdate
>>> c = CogniteClient()
>>> my_update = TimeSeriesUpdate(id=1).description.set("New description").metadata.add({"key": "value"})
>>> res = c.time_series.update(my_update)

Upsert time series

TimeSeriesAPI.upsert(item: Sequence[TimeSeries], mode: Literal['patch', 'replace'] = 'patch') TimeSeriesList
TimeSeriesAPI.upsert(item: TimeSeries, mode: Literal['patch', 'replace'] = 'patch') TimeSeries
Upsert time series, i.e., update if it exists, and create if it does not exist.

Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.

For more details, see Upsert.

Parameters
  • item (TimeSeries | Sequence[TimeSeries]) – TimeSeries or list of TimeSeries to upsert.

  • mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the time series are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified. Note replace will skip beta properties (unit_external_id), if you want to replace the beta properties you have to use the update method.

Returns

The upserted time series(s).

Return type

TimeSeries | TimeSeriesList

Examples

Upsert for TimeSeries:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import TimeSeries
>>> c = CogniteClient()
>>> existing_time_series = c.time_series.retrieve(id=1)
>>> existing_time_series.description = "New description"
>>> new_time_series = TimeSeries(external_id="new_timeSeries", description="New timeSeries")
>>> res = c.time_series.upsert([existing_time_series, new_time_series], mode="replace")

Time Series Data classes

class cognite.client.data_classes.time_series.SortableTimeSeriesProperty(value)

Bases: EnumProperty

An enumeration.

class cognite.client.data_classes.time_series.TimeSeries(id: int | None = None, external_id: str | None = None, name: str | None = None, is_string: bool | None = None, metadata: dict[str, str] | None = None, unit: str | None = None, unit_external_id: str | None = None, asset_id: int | None = None, is_step: bool | None = None, description: str | None = None, security_categories: Sequence[int] | None = None, data_set_id: int | None = None, created_time: int | None = None, last_updated_time: int | None = None, legacy_name: str | None = None, cognite_client: CogniteClient | None = None)

Bases: CogniteResource

No description.

Parameters
  • id (int | None) – A server-generated ID for the object.

  • external_id (str | None) – The externally supplied ID for the time series.

  • name (str | None) – The display short name of the time series. Note: Value of this field can differ from name presented by older versions of API 0.3-0.6.

  • is_string (bool | None) – Whether the time series is string valued or not.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.

  • unit (str | None) – The physical unit of the time series.

  • unit_external_id (str | None) – The physical unit of the time series (reference to unit catalog). Only available for numeric time series.

  • asset_id (int | None) – Asset ID of equipment linked to this time series.

  • is_step (bool | None) – Whether the time series is a step series or not.

  • description (str | None) – Description of the time series.

  • security_categories (Sequence[int] | None) – The required security categories to access this time series.

  • data_set_id (int | None) – The dataSet Id for the item.

  • created_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • last_updated_time (int | None) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • legacy_name (str | None) – Set a value for legacyName to allow applications using API v0.3, v04, v05, and v0.6 to access this time series. The legacy name is the human-readable name for the time series and is mapped to the name field used in API versions 0.3-0.6. The legacyName field value must be unique, and setting this value to an already existing value will return an error. We recommend that you set this field to the same value as externalId.

  • cognite_client (CogniteClient | None) – The client to associate with this object.

asset() Asset

Returns the asset this time series belongs to.

Returns

The asset given by its asset_id.

Return type

Asset

Raises

ValueError – If asset_id is missing.

count() int

Returns the number of datapoints in this time series.

This result may not be completely accurate, as it is based on aggregates which may be occasionally out of date.

Returns

The number of datapoints in this time series.

Return type

int

Raises

RuntimeError – If the time series is string, as count aggregate is only supported for numeric data

Returns

The total number of datapoints

Return type

int

first() Datapoint | None

Returns the first datapoint in this time series. If empty, returns None.

Returns

A datapoint object containing the value and timestamp of the first datapoint.

Return type

Datapoint | None

latest(before: int | str | datetime | None = None) Datapoint | None

Returns the latest datapoint in this time series. If empty, returns None.

Parameters

before (int | str | datetime | None) – No description.

Returns

A datapoint object containing the value and timestamp of the latest datapoint.

Return type

Datapoint | None

class cognite.client.data_classes.time_series.TimeSeriesAggregate(count: int | None = None, **kwargs: Any)

Bases: dict

No description.

Parameters
  • count (int | None) – No description.

  • **kwargs (Any) – No description.

class cognite.client.data_classes.time_series.TimeSeriesFilter(name: str | None = None, unit: str | None = None, unit_external_id: str | None = None, unit_quantity: str | None = None, is_string: bool | None = None, is_step: bool | None = None, metadata: dict[str, str] | None = None, asset_ids: Sequence[int] | None = None, asset_external_ids: Sequence[str] | None = None, asset_subtree_ids: Sequence[dict[str, Any]] | None = None, data_set_ids: Sequence[dict[str, Any]] | None = None, external_id_prefix: str | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None)

Bases: CogniteFilter

No description.

Parameters
  • name (str | None) – Filter on name.

  • unit (str | None) – Filter on unit.

  • unit_external_id (str | None) – Filter on unit external ID.

  • unit_quantity (str | None) – Filter on unit quantity.

  • is_string (bool | None) – Filter on isString.

  • is_step (bool | None) – Filter on isStep.

  • metadata (dict[str, str] | None) – Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.

  • asset_ids (Sequence[int] | None) – Only include time series that reference these specific asset IDs.

  • asset_external_ids (Sequence[str] | None) – Asset External IDs of related equipment that this time series relates to.

  • asset_subtree_ids (Sequence[dict[str, Any]] | None) – Only include time series that are related to an asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.

  • data_set_ids (Sequence[dict[str, Any]] | None) – No description.

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

class cognite.client.data_classes.time_series.TimeSeriesList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[TimeSeries], IdTransformerMixin

class cognite.client.data_classes.time_series.TimeSeriesProperty(value)

Bases: EnumProperty

An enumeration.

class cognite.client.data_classes.time_series.TimeSeriesUpdate(id: int | None = None, external_id: str | None = None)

Bases: CogniteUpdate

Changes will be applied to time series.

Parameters
  • id (int) – A server-generated ID for the object.

  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

Sequences

Retrieve a sequence by id

SequencesAPI.retrieve(id: int | None = None, external_id: str | None = None) Sequence | None

Retrieve a single sequence by id.

Parameters
  • id (int | None) – ID

  • external_id (str | None) – External ID

Returns

Requested sequence or None if it does not exist.

Return type

Sequence | None

Examples

Get sequence by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.retrieve(id=1)

Get sequence by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.retrieve(external_id="1")

Retrieve multiple sequences by id

SequencesAPI.retrieve_multiple(ids: SequenceType[int] | None = None, external_ids: SequenceType[str] | None = None, ignore_unknown_ids: bool = False) SequenceList

Retrieve multiple sequences by id.

Parameters
  • ids (SequenceType[int] | None) – IDs

  • external_ids (SequenceType[str] | None) – External IDs

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Returns

The requested sequences.

Return type

SequenceList

Examples

Get sequences by id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.retrieve_multiple(ids=[1, 2, 3])

Get sequences by external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.retrieve_multiple(external_ids=["abc", "def"])

List sequences

SequencesAPI.list(name: str | None = None, external_id_prefix: str | None = None, metadata: dict[str, str] | None = None, asset_ids: SequenceType[int] | None = None, asset_subtree_ids: int | SequenceType[int] | None = None, asset_subtree_external_ids: str | SequenceType[str] | None = None, data_set_ids: int | SequenceType[int] | None = None, data_set_external_ids: str | SequenceType[str] | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, limit: int | None = 25) SequenceList

Iterate over sequences

Fetches sequences as they are iterated over, so you keep a limited number of objects in memory.

Parameters
  • name (str | None) – Filter out sequences that do not have this exact name.

  • external_id_prefix (str | None) – Filter out sequences that do not have this string as the start of the externalId

  • metadata (dict[str, str] | None) – Filter out sequences that do not match these metadata fields and values (case-sensitive). Format is {“key1”:”value1”,”key2”:”value2”}.

  • asset_ids (SequenceType[int] | None) – Filter out sequences that are not linked to any of these assets.

  • asset_subtree_ids (int | SequenceType[int] | None) – Asset subtree id or list of asset subtree ids to filter on.

  • asset_subtree_external_ids (str | SequenceType[str] | None) – Asset subtree external id or list of asset subtree external ids to filter on.

  • data_set_ids (int | SequenceType[int] | None) – Return only sequences in the specified data set(s) with this id / these ids.

  • data_set_external_ids (str | SequenceType[str] | None) – Return only sequences in the specified data set(s) with this external id / these external ids.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps. Possible keys are min and max, with values given as time stamps in ms.

  • limit (int | None) – Max number of sequences to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

The requested sequences.

Return type

SequenceList

Examples

List sequences:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.list(limit=5)

Iterate over sequences:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for seq in c.sequences:
...     seq # do something with the sequences

Iterate over chunks of sequences to reduce memory load:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for seq_list in c.sequences(chunk_size=2500):
...     seq_list # do something with the sequences

Aggregate sequences

SequencesAPI.aggregate(filter: SequenceFilter | dict | None = None) list[SequenceAggregate]

Aggregate sequences

Parameters

filter (SequenceFilter | dict | None) – Filter on sequence filter with exact match

Returns

List of sequence aggregates

Return type

list[SequenceAggregate]

Examples

Aggregate sequences:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.aggregate(filter={"external_id_prefix": "prefix"})

Aggregate Sequences Count

SequencesAPI.aggregate_count(advanced_filter: Filter | dict | None = None, filter: SequenceFilter | dict | None = None) int

Count of sequences matching the specified filters and search.

Parameters
  • advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count.

  • filter (SequenceFilter | dict | None) – The filter to narrow down sequences to count requiring exact match.

Returns

The number of sequences matching the specified filters and search.

Return type

int

Examples

Count the number of time series in your CDF project:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> count = c.sequences.aggregate_count()

Count the number of sequences with external id prefixed with “mapping:” in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> c = CogniteClient()
>>> is_mapping = filters.Prefix(SequenceProperty.external_id, "mapping:")
>>> count = c.sequences.aggregate_count(advanced_filter=is_mapping)

Aggregate Sequences Value Cardinality

SequencesAPI.aggregate_cardinality_values(property: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) int

Find approximate property count for sequences.

Parameters
  • property (SequenceProperty | str | list[str]) – The property to count the cardinality of.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.

Returns

The number of properties matching the specified filters and search.

Return type

int

Examples

Count the number of different values for the metadata key “efficiency” used for sequences in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> c = CogniteClient()
>>> count = c.sequences.aggregate_cardinality_values(SequenceProperty.metadata_key("efficiency"))

Count the number of timezones (metadata key) for sequences with the word “critical” in the description in your CDF project, but exclude timezones from america:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters, aggregations as aggs
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> c = CogniteClient()
>>> not_america = aggs.Not(aggs.Prefix("america"))
>>> is_critical = filters.Search(SequenceProperty.description, "critical")
>>> timezone_count = c.sequences.aggregate_cardinality_values(
...     SequenceProperty.metadata_key("timezone"),
...     advanced_filter=is_critical,
...     aggregate_filter=not_america)

Aggregate Sequences Property Cardinality

SequencesAPI.aggregate_cardinality_properties(path: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) int

Find approximate paths count for sequences.

Parameters
  • path (SequenceProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.

Returns

The number of properties matching the specified filters and search.

Return type

int

Examples

Count the number of different metadata keys in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> c = CogniteClient()
>>> count = c.sequences.aggregate_cardinality_values(SequenceProperty.metadata)

Aggregate Sequences Unique Values

SequencesAPI.aggregate_unique_values(property: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) UniqueResultList

Get unique paths with counts for sequences.

Parameters
  • property (SequenceProperty | str | list[str]) – The property to group by.

  • advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.

Returns

List of unique values of sequences matching the specified filters and search.

Return type

UniqueResultList

Examples

Get the timezones (metadata key) with count for your sequences in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> c = CogniteClient()
>>> result = c.sequences.aggregate_unique_values(SequenceProperty.metadata_key("timezone"))
>>> print(result.unique)

Get the different metadata keys with count used for sequences created after 2020-01-01 in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> from cognite.client.utils import timestamp_to_ms
>>> from datetime import datetime
>>> c = CogniteClient()
>>> created_after_2020 = filters.Range(SequenceProperty.created_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.sequences.aggregate_unique_values(SequenceProperty.metadata, advanced_filter=created_after_2020)
>>> print(result.unique)

Get the different metadata keys with count for sequences updated after 2020-01-01 in your CDF project, but exclude all metadata keys that starts with “test”:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> from cognite.client.data_classes import aggregations as aggs, filters
>>> c = CogniteClient()
>>> not_test = aggs.Not(aggs.Prefix("test"))
>>> created_after_2020 = filters.Range(SequenceProperty.last_updated_time, gte=timestamp_to_ms(datetime(2020, 1, 1)))
>>> result = c.sequences.aggregate_unique_values(SequenceProperty.metadata, advanced_filter=created_after_2020, aggregate_filter=not_test)
>>> print(result.unique)

Aggregate Sequences Unique Properties

SequencesAPI.aggregate_unique_properties(path: SequenceProperty | str | list[str], advanced_filter: Filter | dict | None = None, aggregate_filter: AggregationFilter | dict | None = None, filter: SequenceFilter | dict | None = None) UniqueResultList

Find approximate unique sequence properties.

Parameters
  • path (SequenceProperty | str | list[str]) – The scope in every document to aggregate properties. The only value allowed now is [“metadata”]. It means to aggregate only metadata properties (aka keys).

  • advanced_filter (Filter | dict | None) – The filter to narrow down the sequences to count cardinality.

  • aggregate_filter (AggregationFilter | dict | None) – The filter to apply to the resulting buckets.

  • filter (SequenceFilter | dict | None) – The filter to narrow down the sequences to count requiring exact match.

Returns

List of unique values of sequences matching the specified filters and search.

Return type

UniqueResultList

Examples

Get the metadata keys with count for your sequences in your CDF project:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.sequences import SequenceProperty
>>> c = CogniteClient()
>>> result = c.sequences.aggregate_unique_properties(SequenceProperty.metadata)

Search for sequences

SequencesAPI.search(name: str | None = None, description: str | None = None, query: str | None = None, filter: SequenceFilter | dict | None = None, limit: int = 25) SequenceList

Search for sequences. Primarily meant for human-centric use-cases and data exploration, not for programs, since matching and ordering may change over time. Use the list function if stable or exact matches are required.

Parameters
  • name (str | None) – Prefix and fuzzy search on name.

  • description (str | None) – Prefix and fuzzy search on description.

  • query (str | None) – Search on name and description using wildcard search on each of the words (separated by spaces). Retrieves results where at least one word must match. Example: ‘some other’

  • filter (SequenceFilter | dict | None) – Filter to apply. Performs exact match on these fields.

  • limit (int) – Max number of results to return.

Returns

The search result as a SequenceList

Return type

SequenceList

Examples

Search for a sequence:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.search(name="some name")

Create a sequence

SequencesAPI.create(sequence: Sequence) Sequence
SequencesAPI.create(sequence: Sequence[Sequence]) SequenceList

Create one or more sequences.

Parameters

sequence (Sequence | SequenceType[Sequence]) – Sequence or list of Sequence to create. The Sequence columns parameter is a list of objects with fields externalId (external id of the column, when omitted, they will be given ids of ‘column0, column1, …’), valueType (data type of the column, either STRING, LONG, or DOUBLE, with default DOUBLE), name, description, metadata (optional fields to describe and store information about the data in the column). Other fields will be removed automatically, so a columns definition from a different sequence object can be passed here.

Returns

The created sequence(s).

Return type

Sequence | SequenceList

Examples

Create a new sequence:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Sequence
>>> c = CogniteClient()
>>> column_def = [
...     {"valueType": "STRING", "externalId": "user", "description": "some description"},
...     {"valueType": "DOUBLE", "externalId": "amount"}]
>>> seq = c.sequences.create(Sequence(external_id="my_sequence", columns=column_def))

Create a new sequence with the same column specifications as an existing sequence:

>>> seq2 = c.sequences.create(Sequence(external_id="my_copied_sequence", columns=seq.columns))

Delete sequences

SequencesAPI.delete(id: int | SequenceType[int] | None = None, external_id: str | SequenceType[str] | None = None, ignore_unknown_ids: bool = False) None

Delete one or more sequences.

Parameters
  • id (int | SequenceType[int] | None) – Id or list of ids

  • external_id (str | SequenceType[str] | None) – External ID or list of external ids

  • ignore_unknown_ids (bool) – Ignore IDs and external IDs that are not found rather than throw an exception.

Examples

Delete sequences by id or external id:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.sequences.delete(id=[1,2,3], external_id="3")

Filter sequences

SequencesAPI.filter(filter: Filter | dict, sort: SortSpec | list[SortSpec] | None = None, limit: int | None = 25) SequenceList

Advanced filter sequences

Advanced filter lets you create complex filtering expressions that combine simple operations, such as equals, prefix, exists, etc., using boolean operators and, or, and not. It applies to basic fields as well as metadata.

Parameters
  • filter (Filter | dict) – Filter to apply.

  • sort (SortSpec | list[SortSpec] | None) – The criteria to sort by. Can be up to two properties to sort by default to ascending order.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of sequences that match the filter criteria.

Return type

SequenceList

Examples

Find all sequences with asset id ‘123’ and metadata key ‘type’ equals ‘efficiency’ and return them sorted by created time:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> c = CogniteClient()
>>> f = filters
>>> is_asset = f.Equals("asset_id", 123)
>>> is_efficiency = f.Equals(["metadata", "type"], "efficiency")
>>> res = c.time_series.filter(filter=f.And(is_asset, is_efficiency), sort="created_time")

Note that you can check the API documentation above to see which properties you can filter on with which filters.

To make it easier to avoid spelling mistakes and easier to look up available properties for filtering and sorting, you can also use the SequenceProperty and SortableSequenceProperty enums.

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.sequences import SequenceProperty, SortableSequenceProperty
>>> c = CogniteClient()
>>> f = filters
>>> is_asset = f.Equals(SequenceProperty.asset_id, 123)
>>> is_efficiency = f.Equals(SequenceProperty.metadata_key("type"), "efficiency")
>>> res = c.time_series.filter(filter=f.And(is_asset, is_efficiency),
...                            sort=SortableSequenceProperty.created_time)

Update sequences

SequencesAPI.update(item: Sequence | SequenceUpdate) Sequence
SequencesAPI.update(item: SequenceType[Sequence | SequenceUpdate]) SequenceList

Update one or more sequences.

Parameters

item (Sequence | SequenceUpdate | SequenceType[Sequence | SequenceUpdate]) – Sequences to update

Returns

Updated sequences.

Return type

Sequence | SequenceList

Examples

Update a sequence that you have fetched. This will perform a full update of the sequences:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.retrieve(id=1)
>>> res.description = "New description"
>>> res = c.sequences.update(res)

Perform a partial update on a sequence, updating the description and adding a new field to metadata:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import SequenceUpdate
>>> c = CogniteClient()
>>> my_update = SequenceUpdate(id=1).description.set("New description").metadata.add({"key": "value"})
>>> res = c.sequences.update(my_update)

Updating column definitions

Currently, updating the column definitions of a sequence is only supported through partial update, using add, remove and modify methods on the columns property.

Add a single new column:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import SequenceUpdate
>>> c = CogniteClient()
>>>
>>> my_update = SequenceUpdate(id=1).columns.add({"valueType":"STRING","externalId":"user","description":"some description"})
>>> res = c.sequences.update(my_update)

Add multiple new columns:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import SequenceUpdate
>>> c = CogniteClient()
>>>
>>> column_def = [{"valueType":"STRING","externalId":"user","description":"some description"}, {"valueType":"DOUBLE","externalId":"amount"}]
>>> my_update = SequenceUpdate(id=1).columns.add(column_def)
>>> res = c.sequences.update(my_update)

Remove a single column:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import SequenceUpdate
>>> c = CogniteClient()
>>>
>>> my_update = SequenceUpdate(id=1).columns.remove("col_external_id1")
>>> res = c.sequences.update(my_update)

Remove multiple columns:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import SequenceUpdate
>>> c = CogniteClient()
>>>
>>> my_update = SequenceUpdate(id=1).columns.remove(["col_external_id1","col_external_id2"])
>>> res = c.sequences.update(my_update)

Update existing columns:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import SequenceUpdate, SequenceColumnUpdate
>>> c = CogniteClient()
>>>
>>> column_updates = [
...     SequenceColumnUpdate(external_id="col_external_id_1").external_id.set("new_col_external_id"),
...     SequenceColumnUpdate(external_id="col_external_id_2").description.set("my new description"),
... ]
>>> my_update = SequenceUpdate(id=1).columns.modify(column_updates)
>>> res = c.sequences.update(my_update)

Upsert sequences

SequencesAPI.upsert(item: Sequence[Sequence], mode: Literal['patch', 'replace'] = 'patch') SequenceList
SequencesAPI.upsert(item: Sequence, mode: Literal['patch', 'replace'] = 'patch') Sequence
Upsert sequences, i.e., update if it exists, and create if it does not exist.

Note this is a convenience method that handles the upserting for you by first calling update on all items, and if any of them fail because they do not exist, it will create them instead.

For more details, see Upsert.

Parameters
  • item (Sequence | SequenceType[Sequence]) – Sequence or list of sequences to upsert.

  • mode (Literal["patch", "replace"]) – Whether to patch or replace in the case the sequences are existing. If you set ‘patch’, the call will only update fields with non-null values (default). Setting ‘replace’ will unset any fields that are not specified.

Returns

The upserted sequence(s).

Return type

Sequence | SequenceList

Examples

Upsert for sequences:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import Sequence
>>> c = CogniteClient()
>>> existing_sequence = c.sequences.retrieve(id=1)
>>> existing_sequence.description = "New description"
>>> new_sequence = Sequence(external_id="new_sequence", description="New sequence")
>>> res = c.sequences.upsert([existing_sequence, new_sequence], mode="replace")

Retrieve data

SequencesDataAPI.retrieve(start: int, end: int | None, column_external_ids: SequenceType[str] | None = None, external_id: str | SequenceType[str] | None = None, id: int | SequenceType[int] | None = None, limit: int | None = None) SequenceData | SequenceDataList

Retrieve data from a sequence

Parameters
  • start (int) – Row number to start from (inclusive).

  • end (int | None) – Upper limit on the row number (exclusive). Set to None or -1 to get all rows until end of sequence.

  • column_external_ids (SequenceType[str] | None) – List of external id for the columns of the sequence. If ‘None’ is passed, all columns will be retrieved.

  • external_id (str | SequenceType[str] | None) – External id of sequence.

  • id (int | SequenceType[int] | None) – Id of sequence.

  • limit (int | None) – Maximum number of rows to return per sequence. Pass None to fetch all (possibly limited by ‘end’).

Returns

SequenceData if single identifier was given, else SequenceDataList

Return type

SequenceData | SequenceDataList

Examples

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.sequences.data.retrieve(id=1, start=0, end=None)
>>> tuples = [(r,v) for r,v in res.items()] # You can use this iterator in for loops and list comprehensions,
>>> single_value = res[23] # ... get the values at a single row number,
>>> col = res.get_column(external_id='columnExtId') # ... get the array of values for a specific column,
>>> df = res.to_pandas() # ... or convert the result to a dataframe

Retrieve pandas dataframe

SequencesDataAPI.retrieve_dataframe(start: int, end: int | None, column_external_ids: list[str] | None = None, external_id: str | None = None, column_names: str | None = None, id: int | None = None, limit: int | None = None) pandas.DataFrame

Retrieve data from a sequence as a pandas dataframe

Parameters
  • start (int) – (inclusive) row number to start from.

  • end (int | None) – (exclusive) upper limit on the row number. Set to None or -1 to get all rows until end of sequence.

  • column_external_ids (list[str] | None) – List of external id for the columns of the sequence. If ‘None’ is passed, all columns will be retrieved.

  • external_id (str | None) – External id of sequence.

  • column_names (str | None) – Which field(s) to use as column header. Can use “externalId”, “id”, “columnExternalId”, “id|columnExternalId” or “externalId|columnExternalId”. Default is “externalId|columnExternalId” for queries on more than one sequence, and “columnExternalId” for queries on a single sequence.

  • id (int | None) – Id of sequence

  • limit (int | None) – Maximum number of rows to return per sequence.

Returns

pandas.DataFrame

Return type

pandas.DataFrame

Examples

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> df = c.sequences.data.retrieve_dataframe(id=1, start=0, end=None)

Insert rows into a sequence

SequencesDataAPI.insert(rows: dict[int, SequenceType[int | float | str]] | SequenceType[tuple[int, SequenceType[int | float | str]]] | SequenceType[dict[str, Any]] | SequenceData, column_external_ids: SequenceType[str] | None, id: int | None = None, external_id: str | None = None) None

Insert rows into a sequence

Parameters
  • rows (dict[int, SequenceType[int | float | str]] | SequenceType[tuple[int, SequenceType[int | float | str]]] | SequenceType[dict[str, Any]] | SequenceData) – The rows you wish to insert. Can either be a list of tuples, a list of {“rowNumber”:… ,”values”: …} objects, a dictionary of rowNumber: data, or a SequenceData object. See examples below.

  • column_external_ids (SequenceType[str] | None) – List of external id for the columns of the sequence.

  • id (int | None) – Id of sequence to insert rows into.

  • external_id (str | None) – External id of sequence to insert rows into.

Examples

Your rows of data can be a list of tuples where the first element is the rownumber and the second element is the data to be inserted:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> seq = c.sequences.create(Sequence(columns=[{"valueType": "STRING", "externalId":"col_a"},{"valueType": "DOUBLE", "externalId":"col_b"}]))
>>> data = [(1, ['pi',3.14]), (2, ['e',2.72]) ]
>>> c.sequences.data.insert(column_external_ids=["col_a","col_b"], rows=data, id=1)

They can also be provided as a list of API-style objects with a rowNumber and values field:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> data = [{"rowNumber": 123, "values": ['str',3]}, {"rowNumber": 456, "values": ["bar",42]} ]
>>> c.sequences.data.insert(data, id=1, column_external_ids=["col_a","col_b"]) # implicit columns are retrieved from metadata

Or they can be a given as a dictionary with row number as the key, and the value is the data to be inserted at that row:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> data = {123 : ['str',3], 456 : ['bar',42] }
>>> c.sequences.data.insert(column_external_ids=['stringColumn','intColumn'], rows=data, id=1)

Finally, they can be a SequenceData object retrieved from another request. In this case column_external_ids from this object are used as well.

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> data = c.sequences.data.retrieve(id=2,start=0,end=10)
>>> c.sequences.data.insert(rows=data, id=1,column_external_ids=None)

Insert a pandas dataframe into a sequence

SequencesDataAPI.insert_dataframe(dataframe: pandas.DataFrame, id: int | None = None, external_id: str | None = None) None

Insert a Pandas dataframe.

The index of the dataframe must contain the row numbers. The names of the remaining columns specify the column external ids. The sequence and columns must already exist.

Parameters
  • dataframe (pandas.DataFrame) – Pandas DataFrame object containing the sequence data.

  • id (int | None) – Id of sequence to insert rows into.

  • external_id (str | None) – External id of sequence to insert rows into.

Examples

Multiply data in the sequence by 2:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> df = c.sequences.data.retrieve_dataframe(id=123, start=0, end=None)
>>> c.sequences.data.insert_dataframe(df*2, id=123)

Delete rows from a sequence

SequencesDataAPI.delete(rows: SequenceType[int], id: int | None = None, external_id: str | None = None) None

Delete rows from a sequence

Parameters
  • rows (SequenceType[int]) – List of row numbers.

  • id (int | None) – Id of sequence to delete rows from.

  • external_id (str | None) – External id of sequence to delete rows from.

Examples

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.sequences.data.delete(id=1, rows=[1,2,42])

Delete a range of rows from a sequence

SequencesDataAPI.delete_range(start: int, end: int | None, id: int | None = None, external_id: str | None = None) None

Delete a range of rows from a sequence. Note this operation is potentially slow, as retrieves each row before deleting.

Parameters
  • start (int) – Row number to start from (inclusive).

  • end (int | None) – Upper limit on the row number (exclusive). Set to None or -1 to delete all rows until end of sequence.

  • id (int | None) – Id of sequence to delete rows from.

  • external_id (str | None) – External id of sequence to delete rows from.

Examples

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.sequences.data.delete_range(id=1, start=0, end=None)

Sequence Data classes

class cognite.client.data_classes.sequences.Sequence(id: int | None = None, name: str | None = None, description: str | None = None, asset_id: int | None = None, external_id: str | None = None, metadata: dict[str, Any] | None = None, columns: SequenceType[dict[str, Any]] | None = None, created_time: int | None = None, last_updated_time: int | None = None, data_set_id: int | None = None, cognite_client: CogniteClient | None = None)

Information about the sequence stored in the database

Parameters
  • id (int | None) – Unique cognite-provided identifier for the sequence

  • name (str | None) – Name of the sequence

  • description (str | None) – Description of the sequence

  • asset_id (int | None) – Optional asset this sequence is associated with

  • external_id (str | None) – The external ID provided by the client. Must be unique for the resource type.

  • metadata (dict[str, Any] | None) – Custom, application specific metadata. String key -> String value. Maximum length of key is 32 bytes, value 512 bytes, up to 16 key-value pairs.

  • columns (SequenceType[dict[str, Any]] | None) – List of column definitions

  • created_time (int | None) – Time when this sequence was created in CDF in milliseconds since Jan 1, 1970.

  • last_updated_time (int | None) – The last time this sequence was updated in CDF, in milliseconds since Jan 1, 1970.

  • data_set_id (int | None) – Data set that this sequence belongs to

  • cognite_client (CogniteClient | None) – The client to associate with this object.

class cognite.client.data_classes.sequences.SequenceAggregate(count: int | None = None, **kwargs: Any)

Bases: dict

No description.

Parameters
  • count (int | None) – No description.

  • **kwargs (Any) – No description.

class cognite.client.data_classes.sequences.SequenceColumnUpdate(id: int | None = None, external_id: str | None = None)

Bases: CogniteUpdate

No description.

Parameters

external_id (str) – The external ID provided by the client. Must be unique for the resource type.

class cognite.client.data_classes.sequences.SequenceData(id: int | None = None, external_id: str | None = None, rows: SequenceType[dict] | None = None, row_numbers: SequenceType[int] | None = None, values: SequenceType[SequenceType[int | str | float]] | None = None, columns: SequenceType[dict[str, Any]] | None = None)

Bases: CogniteResource

An object representing a list of rows from a sequence.

Parameters
  • id (int | None) – Id of the sequence the data belong to

  • external_id (str | None) – External id of the sequence the data belong to

  • rows (SequenceType[dict] | None) – Combined row numbers and row data object from the API. If you pass this, row_numbers/values are ignored.

  • row_numbers (SequenceType[int] | None) – The data row numbers.

  • values (SequenceType[SequenceType[int | str | float]] | None) – The data values, one row at a time.

  • columns (SequenceType[dict[str, Any]] | None) – SequenceType[dict]: The column information, in the format returned by the API.

property column_external_ids: list[str]

Retrieves list of column external ids for the sequence, for use in e.g. data retrieve or insert methods.

Returns

List of sequence column external ids.

Return type

list[str]

property column_value_types: list[str]

Retrieves list of column value types.

Returns

List of column value types

Return type

list[str]

dump(camel_case: bool = False) dict[str, Any]

Dump the sequence data into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representing the instance.

Return type

dict[str, Any]

get_column(external_id: str) list[int | str | float]

Get a column by external_id.

Parameters

external_id (str) – External id of the column.

Returns

A list of values for that column in the sequence

Return type

list[int | str | float]

items() Iterator[tuple[int, list[int | str | float]]]

Returns an iterator over tuples of (row number, values).

to_pandas(column_names: str = 'columnExternalId') pandas.DataFrame

Convert the sequence data into a pandas DataFrame.

Parameters

column_names (str) – Which field(s) to use as column header. Can use “externalId”, “id”, “columnExternalId”, “id|columnExternalId” or “externalId|columnExternalId”.

Returns

The dataframe.

Return type

pandas.DataFrame

class cognite.client.data_classes.sequences.SequenceDataList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[SequenceData]

to_pandas(column_names: str = 'externalId|columnExternalId') pandas.DataFrame

Convert the sequence data list into a pandas DataFrame. Each column will be a sequence.

Parameters

column_names (str) – Which field to use as column header. Can use any combination of “externalId”, “columnExternalId”, “id” and other characters as a template.

Returns

The sequence data list as a pandas DataFrame.

Return type

pandas.DataFrame

class cognite.client.data_classes.sequences.SequenceFilter(name: str | None = None, external_id_prefix: str | None = None, metadata: dict[str, Any] | None = None, asset_ids: SequenceType[int] | None = None, asset_subtree_ids: SequenceType[dict[str, Any]] | None = None, created_time: dict[str, Any] | TimestampRange | None = None, last_updated_time: dict[str, Any] | TimestampRange | None = None, data_set_ids: SequenceType[dict[str, Any]] | None = None)

Bases: CogniteFilter

No description.

Parameters
  • name (str | None) – Return only sequences with this exact name.

  • external_id_prefix (str | None) – Filter by this (case-sensitive) prefix for the external ID.

  • metadata (dict[str, Any] | None) – Filter the sequences by metadata fields and values (case-sensitive). Format is {“key1”:”value1”,”key2”:”value2”}.

  • asset_ids (SequenceType[int] | None) – Return only sequences linked to one of the specified assets.

  • asset_subtree_ids (SequenceType[dict[str, Any]] | None) – Only include sequences that have a related asset in a subtree rooted at any of these assetIds (including the roots given). If the total size of the given subtrees exceeds 100,000 assets, an error will be returned.

  • created_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • last_updated_time (dict[str, Any] | TimestampRange | None) – Range between two timestamps.

  • data_set_ids (SequenceType[dict[str, Any]] | None) – Only include sequences that belong to these datasets.

class cognite.client.data_classes.sequences.SequenceList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[Sequence], IdTransformerMixin

class cognite.client.data_classes.sequences.SequenceProperty(value)

Bases: EnumProperty

An enumeration.

class cognite.client.data_classes.sequences.SequenceUpdate(id: int | None = None, external_id: str | None = None)

Bases: CogniteUpdate

No description.

Parameters
  • id (int) – A server-generated ID for the object.

  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

class cognite.client.data_classes.sequences.SortableSequenceProperty(value)

Bases: EnumProperty

An enumeration.

Data Point Subscriptions

Warning

DataPoint Subscription is a new feature:
  • The API specification is in beta.

  • The SDK implementation is in alpha.

Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.

Create data point subscriptions

DatapointsSubscriptionAPI.create(subscription: DataPointSubscriptionCreate) DatapointSubscription

Create a subscription

Create a subscription that can be used to listen for changes in data points for a set of time series.

Parameters

subscription (DataPointSubscriptionCreate) – Subscription to create.

Returns

Created subscription

Return type

DatapointSubscription

Examples

Create a subscription with explicit time series IDs:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionCreate
>>> c = CogniteClient()
>>> sub = DataPointSubscriptionCreate("mySubscription", partition_count=1, time_series_ids=["myFistTimeSeries", "mySecondTimeSeries"], name="My subscription")
>>> created = c.time_series.subscriptions.create(sub)

Create a filter defined subscription for all numeric time series:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionCreate
>>> from cognite.client.data_classes import filters
>>> from cognite.client.data_classes.datapoints_subscriptions import DatapointSubscriptionFilterProperties
>>> c = CogniteClient()
>>> f = filters
>>> p = DatapointSubscriptionFilterProperties
>>> numeric_timeseries = f.Equals(p.is_string, False)
>>> sub = DataPointSubscriptionCreate("mySubscription", partition_count=1, filter=numeric_timeseries, name="My subscription for Numeric time series")
>>> created = c.time_series.subscriptions.create(sub)

Retrieve a data point subscription by id(s)

DatapointsSubscriptionAPI.retrieve(external_id: str, ignore_unknown_ids: bool = False) DatapointSubscription | None

Retrieve one subscription by external ID.

Parameters
  • external_id (str) – External ID of the subscription to retrieve.

  • ignore_unknown_ids (bool) – Whether to ignore IDs and external IDs that are not found rather than throw an exception.

Returns

The requested subscription.

Return type

DatapointSubscription | None

Examples

Retrieve a subscription by external ID:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> res = c.time_series.subscriptions.retrieve("my_subscription")

List data point subscriptions

DatapointsSubscriptionAPI.list(limit: int | None = 25) DatapointSubscriptionList

List data point subscriptions

Parameters

limit (int | None) – Maximum number of subscriptions to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of requested datapoint subscriptions

Return type

DatapointSubscriptionList

Examples

List 5 subscriptions:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> subscriptions = c.time_series.subscriptions.list(limit=5)

List member time series of subscription

DatapointsSubscriptionAPI.list_member_time_series(external_id: str, limit: int | None = 25) TimeSeriesIDList

List time series in a subscription

Retrieve a list of time series (IDs) that the subscription is currently retrieving updates from

Parameters
  • external_id (str) – External ID of the subscription to retrieve members of.

  • limit (int | None) – Maximum number of time series to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

List of time series in the subscription.

Return type

TimeSeriesIDList

Examples

List time series in a subscription:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionUpdate
>>> c = CogniteClient()
>>> members = c.time_series.subscriptions.list_member_time_series("my_subscription")
>>> timeseries_external_ids = members.as_external_ids()

Iterate over subscriptions data

DatapointsSubscriptionAPI.iterate_data(external_id: str, start: str | None = None, limit: int = 25, partition: int = 0, cursor: str | None = None) Iterator[DatapointSubscriptionBatch]

Iterate over data from a given subscription.

Data can be ingested datapoints and time ranges where data is deleted. This endpoint will also return changes to the subscription itself, that is, if time series are added or removed from the subscription.

Warning

This endpoint will store updates from when the subscription was created, but updates older than 7 days may be discarded.

Parameters
  • external_id (str) – The external ID of the subscription.

  • start (str | None) – When to start the iteration. If set to None, the iteration will start from the beginning. The format is “N[timeunit]-ago”, where timeunit is w,d,h,m (week, day, hour, minute). For example, “12h-ago” will start the iteration from 12 hours ago. You can also set it to “now” to jump straight to the end. Defaults to None.

  • limit (int) – Approximate number of results to return across all partitions.

  • partition (int) – The partition to iterate over. Defaults to 0.

  • cursor (str | None) – Optional cursor to start iterating from.

Yields

DatapointSubscriptionBatch – Changes to the subscription and data in the subscribed time series.

Examples

Iterate over changes to subscription timeseries since the beginning until there is no more data:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> for batch in c.time_series.subscriptions.iterate_data("my_subscription"):
...     print(f"Added {len(batch.subscription_changes.added)} timeseries")
...     print(f"Removed {len(batch.subscription_changes.removed)} timeseries")
...     print(f"Changed timeseries data in {len(batch.updates)} updates")
...     if not batch.has_next:
...         break

Iterate continuously over all changes to the subscription newer than 3 days:

>>> import time
>>> for batch in c.time_series.subscriptions.iterate_data("my_subscription", "3d-ago"):
...     print(f"Added {len(batch.subscription_changes.added)} timeseries")
...     print(f"Removed {len(batch.subscription_changes.removed)} timeseries")
...     print(f"Changed timeseries data in {len(batch.updates)} updates")
...     if not batch.has_next:
...         time.sleep(1)

Update data point subscription

DatapointsSubscriptionAPI.update(update: DataPointSubscriptionUpdate) DatapointSubscription

Update a subscriptions

Update a subscription. Note that Fields that are not included in the request are not changed. Furthermore, the subscription partition cannot be changed.

Parameters

update (DataPointSubscriptionUpdate) – The subscription update.

Returns

Updated subscription.

Return type

DatapointSubscription

Examples

Change the name of a preexisting subscription:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionUpdate
>>> c = CogniteClient()
>>> update = DataPointSubscriptionUpdate("my_subscription").name.set("My New Name")
>>> updated = c.time_series.subscriptions.update(update)

Add a time series to a preexisting subscription:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionUpdate
>>> c = CogniteClient()
>>> update = DataPointSubscriptionUpdate("my_subscription").time_series_ids.add(["MyNewTimeSeriesExternalId"])
>>> updated = c.time_series.subscriptions.update(update)

Delete data point subscription

DatapointsSubscriptionAPI.delete(external_id: str | Sequence[str], ignore_unknown_ids: bool = False) None

Delete subscription(s). This operation cannot be undone.

Parameters
  • external_id (str | Sequence[str]) – External ID or list of external IDs of subscriptions to delete.

  • ignore_unknown_ids (bool) – Whether to ignore IDs and external IDs that are not found rather than throw an exception.

Examples

Delete a subscription by external ID:

>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> c.time_series.subscriptions.delete("my_subscription")

Data Point Subscription classes

class cognite.client.data_classes.datapoints_subscriptions.DataDeletion(inclusive_begin: 'int', exclusive_end: 'int | None')

Bases: object

class cognite.client.data_classes.datapoints_subscriptions.DataPointSubscriptionCreate(external_id: str, partition_count: int, time_series_ids: list[ExternalId] | None = None, filter: Filter | None = None, name: str | None = None, description: str | None = None)

Bases: DatapointSubscriptionCore

A data point subscription is a way to listen to changes to time series data points, in ingestion order.

This is the write version of a subscription, used to create new subscriptions.

A subscription can either be defined directly by a list of time series ids or indirectly by a filter.

Parameters
  • external_id (str) – Externally provided ID for the subscription. Must be unique.

  • partition_count (int) – The maximum effective parallelism of this subscription (the number of clients that can read from it concurrently) will be limited to this number, but a higher partition count will cause a higher time overhead. The partition count must be between 1 and 100. CAVEAT: This cannot change after the subscription has been created.

  • time_series_ids (list[ExternalId] | None) – List of (external) ids of time series that this subscription will listen to. Not compatible with filter.

  • filter (Filter | None) – A filter DSL (Domain Specific Language) to define advanced filter queries. Not compatible with time_series_ids.

  • name (str | None) – No description.

  • description (str | None) – A summary explanation for the subscription.

class cognite.client.data_classes.datapoints_subscriptions.DataPointSubscriptionUpdate(external_id: str)

Bases: CogniteUpdate

Changes applied to datapoint subscription

Parameters

external_id (str) – The external ID provided by the client. Must be unique for the resource type.

class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscription(external_id: ExternalId, partition_count: int, created_time: int, last_updated_time: int, time_series_count: int, filter: Filter | None = None, name: str | None = None, description: str | None = None, **_: Any)

Bases: DatapointSubscriptionCore

A data point subscription is a way to listen to changes to time series data points, in ingestion order.

This is the read version of a subscription, used when reading subscriptions from CDF.

Parameters
  • external_id (ExternalId) – Externally provided ID for the subscription. Must be unique.

  • partition_count (int) – The maximum effective parallelism of this subscription (the number of clients that can read from it concurrently) will be limited to this number, but a higher partition count will cause a higher time overhead.

  • created_time (int) – Time when the subscription was created in CDF in milliseconds since Jan 1, 1970.

  • last_updated_time (int) – Time when the subscription was last updated in CDF in milliseconds since Jan 1, 1970.

  • time_series_count (int) – The number of time series in the subscription.

  • filter (Filter | None) – If present, the subscription is defined by this filter.

  • name (str | None) – No description.

  • description (str | None) – A summary explanation for the subscription.

  • **_ (Any) – No description.

class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionBatch(updates: 'list[DatapointsUpdate]', subscription_changes: 'SubscriptionTimeSeriesUpdate', has_next: 'bool', cursor: 'str')

Bases: object

class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionFilterProperties(value)

Bases: EnumProperty

An enumeration.

class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[DatapointSubscription]

class cognite.client.data_classes.datapoints_subscriptions.DatapointSubscriptionPartition(index: 'int', cursor: 'str | None' = None)

Bases: object

class cognite.client.data_classes.datapoints_subscriptions.DatapointsUpdate(time_series: 'TimeSeriesID', upserts: 'Datapoints', deletes: 'list[DataDeletion]')

Bases: object

class cognite.client.data_classes.datapoints_subscriptions.SubscriptionTimeSeriesUpdate(added: 'list[TimeSeriesID]', removed: 'list[TimeSeriesID]')

Bases: object

class cognite.client.data_classes.datapoints_subscriptions.TimeSeriesID(id: int, external_id: ExternalId | None = None)

Bases: CogniteResource

A TimeSeries Identifier to uniquely identify a time series.

Parameters
  • id (int) – A server-generated ID for the object.

  • external_id (ExternalId | None) – The external ID provided by the client. Must be unique for the resource type.

dump(camel_case: bool = False) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to False.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.datapoints_subscriptions.TimeSeriesIDList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[TimeSeriesID], IdTransformerMixin