Retrieve datapoints
- async AsyncCogniteClient.time_series.data.retrieve(
- *,
- id: None | int | DatapointsQuery | Sequence[int | DatapointsQuery] = None,
- external_id: None | str | DatapointsQuery | SequenceNotStr[str | DatapointsQuery] = None,
- instance_id: None | NodeId | DatapointsQuery | Sequence[NodeId | DatapointsQuery] = None,
- start: int | str | datetime | None = None,
- end: int | str | datetime | None = None,
- aggregates: Literal['average', 'continuous_variance', 'count', 'count_bad', 'count_good', 'count_uncertain', 'discrete_variance', 'duration_bad', 'duration_good', 'duration_uncertain', 'interpolation', 'max', 'max_datapoint', 'min', 'min_datapoint', 'step_interpolation', 'sum', 'total_variation'] | str | list[Literal['average', 'continuous_variance', 'count', 'count_bad', 'count_good', 'count_uncertain', 'discrete_variance', 'duration_bad', 'duration_good', 'duration_uncertain', 'interpolation', 'max', 'max_datapoint', 'min', 'min_datapoint', 'step_interpolation', 'sum', 'total_variation'] | str] | None = None,
- granularity: str | None = None,
- timezone: str | timezone | ZoneInfo | None = None,
- target_unit: str | None = None,
- target_unit_system: str | None = None,
- limit: int | None = None,
- include_outside_points: bool = False,
- ignore_unknown_ids: bool = False,
- include_status: bool = False,
- ignore_bad_datapoints: bool = True,
- treat_uncertain_as_bad: bool = True,
Retrieve datapoints for one or more time series.
- Performance guide:
In order to retrieve millions of datapoints as efficiently as possible, here are a few guidelines:
Make one call to retrieve and fetch all time series in go, rather than making multiple calls (if your memory allows it). The SDK will optimize retrieval strategy for you!
For best speed, and significantly lower memory usage, consider using
retrieve_arrays(...)which usesnumpy.ndarraysfor data storage.Unlimited queries (
limit=None) are most performant as they are always fetched in parallel, for any number of requested time series, even one.Limited queries, (e.g.
limit=500_000) are much less performant, at least for large limits, as each individual time series is fetched serially (we can’t predict where on the timeline the datapoints are). Thus parallelisation is only used when asking for multiple “limited” time series.Try to avoid specifying start and end to be very far from the actual data: If you have data from 2000 to 2015, don’t use start=0 (1970).
Using
timezoneand/or calendar granularities like month/quarter/year in aggregate queries comes at a penalty as they are expensive for the API to compute.
Warning
When using the AsyncCogniteClient, always
awaitthe result of this method and never run multiple calls concurrently (e.g. using asyncio.gather). You can pass as many queries as you like to a single call, and the SDK will optimize the retrieval strategy for you intelligently.Tip
To read datapoints efficiently, while keeping a low memory footprint e.g. to copy from one project to another, check out
__call__(). It allows you to iterate through datapoints in chunks, and also control how many time series to iterate at the same time.Time series support status codes like Good, Uncertain and Bad. You can read more in the Cognite Data Fusion developer documentation on status codes.
- Parameters:
id (None | int | DatapointsQuery | Sequence[int | DatapointsQuery]) – Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | DatapointsQuery | SequenceNotStr[str | DatapointsQuery]) – External id, dict (with external id) or (mixed) sequence of these. See examples below.
instance_id (None | NodeId | DatapointsQuery | Sequence[NodeId | DatapointsQuery]) – Instance id or sequence of instance ids.
start (int | str | datetime.datetime | None) – Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime.datetime | None) – Exclusive end. Default: “now”
aggregates (Aggregate | str | list[Aggregate | str] | None) – Single aggregate or list of aggregates to retrieve. Available options:
average,continuous_variance,count,count_bad,count_good,count_uncertain,discrete_variance,duration_bad,duration_good,duration_uncertain,interpolation,max,max_datapoint,min,min_datapoint,step_interpolation,sumandtotal_variation. Default: None (raw datapoints returned)granularity (str | None) – The granularity to fetch aggregates at. Can be given as an abbreviation or spelled out for clarity:
s/second(s),m/minute(s),h/hour(s),d/day(s),w/week(s),mo/month(s),q/quarter(s), ory/year(s). Examples:30s,5m,1day,2weeks. Default: None.timezone (str | datetime.timezone | ZoneInfo | None) – For raw datapoints, which timezone to use when displaying (will not affect what is retrieved). For aggregates, which timezone to align to for granularity ‘hour’ and longer. Align to the start of the hour, day or month. For timezones of type Region/Location, like ‘Europe/Oslo’, pass a string or
ZoneInfoinstance. The aggregate duration will then vary, typically due to daylight saving time. You can also use a fixed offset from UTC by passing a string like ‘+04:00’, ‘UTC-7’ or ‘UTC-02:30’ or an instance ofdatetime.timezone. Note: Historical timezones with second offset are not supported, and timezones with minute offsets (e.g. UTC+05:30 or Asia/Kolkata) may take longer to execute.target_unit (str | None) – The unit_external_id of the datapoints returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None) – The unit system of the datapoints returned. Cannot be used with target_unit.
limit (int | None) – Maximum number of datapoints to return for each time series. Default: None (no limit)
include_outside_points (bool) – Whether to include outside points. Not allowed when fetching aggregates. Default: False
ignore_unknown_ids (bool) – Whether to ignore missing time series rather than raising an exception. Default: False
include_status (bool) – Also return the status code, an integer, for each datapoint in the response. Only relevant for raw datapoint queries, and the object aggregates
min_datapointandmax_datapoint.ignore_bad_datapoints (bool) – Treat datapoints with a bad status code as if they do not exist. If set to false, raw queries will include bad datapoints in the response, and aggregates will in general omit the time period between a bad datapoint and the next good datapoint. Also, the period between a bad datapoint and the previous good datapoint will be considered constant. Default: True.
treat_uncertain_as_bad (bool) – Treat datapoints with uncertain status codes as bad. If false, treat datapoints with uncertain status codes as good. Used for both raw queries and aggregates. Default: True.
- Returns:
A
Datapointsobject containing the requested data, or aDatapointsListif multiple time series were asked for (the ordering is ids first, then external_ids). If ignore_unknown_ids is True, a single time series is requested and it is not found, the function will return None.- Return type:
Datapoints | DatapointsList | None
Examples
You can specify the identifiers of the datapoints you wish to retrieve in a number of ways. In this example we are using the time-ago format,
"2w-ago"to get raw data for the time series with id=42 from 2 weeks ago up until now. You can also use the time-ahead format, like"3d-ahead", to specify a relative time in the future.>>> from cognite.client import CogniteClient, AsyncCogniteClient >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option >>> dps = client.time_series.data.retrieve(id=42, start="2w-ago") >>> # You can also use instance_id: >>> from cognite.client.data_classes.data_modeling import NodeId >>> dps = client.time_series.data.retrieve(instance_id=NodeId("ts-space", "foo"))
Although raw datapoints are returned by default, you can also get aggregated values, such as max or average. You may also fetch more than one time series simultaneously. Here we are getting daily averages and maximum values for all of 2018, for two different time series, where we’re specifying start and end as integers (milliseconds after epoch). In the below example, we fetch them using their external ids:
>>> dps_lst = client.time_series.data.retrieve( ... external_id=["foo", "bar"], ... start=1514764800000, ... end=1546300800000, ... aggregates=["max", "average"], ... granularity="1d", ... )
In the two code examples above, we have a dps object (an instance of
Datapoints), and a dps_lst object (an instance ofDatapointsList). On dps, which in this case contains raw datapoints, you may access the underlying data directly by using the .value attribute. This works for both numeric and string (raw) datapoints, but not aggregates - they must be accessed by their respective names, because you’re allowed to fetch all available aggregates simultaneously, and they are stored on the same object:>>> raw_data = dps.value >>> first_dps = dps_lst[0] # optionally: `dps_lst.get(external_id="foo")` >>> avg_data = first_dps.average >>> max_data = first_dps.max
You may also slice a
Datapointsobject (you getDatapointsback), or ask for “a row of data” at a single index in same way you would do with a built-in list (you get a Datapoint object back, note the singular name). You’ll also get Datapoint objects when iterating through aDatapointsobject, but this should generally be avoided (consider this a performance warning):>>> dps_slice = dps[-10:] # Last ten values >>> dp = dps[3] # The third value >>> for dp in dps_slice: ... pass # do something!
All parameters can be individually set if you use and pass
DatapointsQueryobjects (evenignore_unknown_ids, contrary to the API). If you also pass top-level parameters, these will be overruled by the individual parameters (where both exist, so think of these as defaults). You are free to mix any kind of ids and external ids: Single identifiers, single DatapointsQuery objects and (mixed) lists of these.Let’s say you want different aggregates and end-times for a few time series (when only fetching a single aggregate, you may pass the string directly for convenience):
>>> from cognite.client.data_classes import DatapointsQuery >>> dps_lst = client.time_series.data.retrieve( ... id=[ ... DatapointsQuery(id=42, end="1d-ago", aggregates="average"), ... DatapointsQuery(id=69, end="2d-ahead", aggregates=["average"]), ... DatapointsQuery(id=96, end="3d-ago", aggregates=["min", "max", "count"]), ... ], ... external_id=DatapointsQuery(external_id="foo", aggregates="max"), ... start="5d-ago", ... granularity="1h", ... )
Certain aggregates are very useful when they follow the calendar, for example electricity consumption per day, week, month or year. You may request such calendar-based aggregates in a specific timezone to make them even more useful: daylight savings (DST) will be taken care of automatically and the datapoints will be aligned to the timezone. Note: Calendar granularities and timezone can be used independently. To get monthly local aggregates in Oslo, Norway you can do:
>>> dps = client.time_series.data.retrieve( ... id=123, aggregates="sum", granularity="1month", timezone="Europe/Oslo" ... )
When requesting multiple time series, an easy way to get the datapoints of a specific one is to use the .get method on the returned
DatapointsListobject, then specify if you want id or external_id. Note: If you fetch a time series by using id, you can still access it with its external_id (and the opposite way around), if you know it:>>> from datetime import datetime, timezone >>> utc = timezone.utc >>> dps_lst = client.time_series.data.retrieve( ... start=datetime(1907, 10, 14, tzinfo=utc), ... end=datetime(1907, 11, 6, tzinfo=utc), ... id=[42, 43, 44, ..., 499, 500], ... ) >>> ts_350 = dps_lst.get(id=350) # ``Datapoints`` object
…but what happens if you request some duplicate ids or external_ids? In this example we will show how to get data from multiple disconnected periods. Let’s say you’re tasked to train a machine learning model to recognize a specific failure mode of a system, and you want the training data to only be from certain periods (when an alarm was on/high). Assuming these alarms are stored as events in CDF, with both start- and end times, we can use these directly in the query.
After fetching, the .get method will return a list of
Datapointsinstead, (assuming we have more than one event) in the same order, similar to how slicing works with non-unique indices on Pandas DataFrames:>>> periods = client.events.list(type="alarm", subtype="pressure") >>> sensor_xid = "foo-pressure-bar" >>> dps_lst = client.time_series.data.retrieve( ... id=[42, 43, 44], ... external_id=[ ... DatapointsQuery(external_id=sensor_xid, start=ev.start_time, end=ev.end_time) ... for ev in periods ... ], ... ) >>> ts_44 = dps_lst.get(id=44) # Single ``Datapoints`` object >>> ts_lst = dps_lst.get( ... external_id=sensor_xid ... ) # List of ``len(periods)`` ``Datapoints`` objects
The API has an endpoint to
retrieve_latest(), i.e. “before”, but not “after”. Luckily, we can emulate that behaviour easily. Let’s say we have a very dense time series and do not want to fetch all of the available raw data (or fetch less precise aggregate data), just to get the very first datapoint of every month (from e.g. the year 2000 through 2010):>>> import itertools >>> month_starts = [ ... datetime(year, month, 1, tzinfo=utc) ... for year, month in itertools.product(range(2000, 2011), range(1, 13)) ... ] >>> dps_lst = client.time_series.data.retrieve( ... external_id=[ ... DatapointsQuery(external_id="foo", start=start) for start in month_starts ... ], ... limit=1, ... )
To get all historic and future datapoints for a time series, e.g. to do a backup, you may want to import the two integer constants:
MIN_TIMESTAMP_MSandMAX_TIMESTAMP_MS, to make sure you do not miss any. Performance warning: This pattern of fetching datapoints from the entire valid time domain is slower and shouldn’t be used for regular “day-to-day” queries:>>> from cognite.client.utils import MIN_TIMESTAMP_MS, MAX_TIMESTAMP_MS >>> dps_backup = client.time_series.data.retrieve( ... id=123, start=MIN_TIMESTAMP_MS, end=MAX_TIMESTAMP_MS + 1 ... ) # end is exclusive
If you have a time series with ‘unit_external_id’ set, you can use the ‘target_unit’ parameter to convert the datapoints to the desired unit. In the example below, we are converting temperature readings from a sensor measured and stored in Celsius, to Fahrenheit (we’re assuming that the time series has e.g.
unit_external_id="temperature:deg_c"):>>> client.time_series.data.retrieve( ... id=42, start="2w-ago", target_unit="temperature:deg_f" ... )
Or alternatively, you can use the ‘target_unit_system’ parameter to convert the datapoints to the desired unit system:
>>> client.time_series.data.retrieve( ... id=42, start="2w-ago", target_unit_system="Imperial" ... )
To retrieve status codes for a time series, pass
include_status=True. This is only possible for raw datapoint queries. You would typically also passignore_bad_datapoints=Falseto not hide all the datapoints that are marked as uncertain or bad, which is the API’s default behaviour. You may also usetreat_uncertain_as_badto control how uncertain values are interpreted.>>> dps = client.time_series.data.retrieve( ... id=42, include_status=True, ignore_bad_datapoints=False ... ) >>> dps.status_code # list of integer codes, e.g.: [0, 1073741824, 2147483648] >>> dps.status_symbol # list of symbolic representations, e.g. [Good, Uncertain, Bad]
There are six aggregates directly related to status codes, three for count: ‘count_good’, ‘count_uncertain’ and ‘count_bad’, and three for duration: ‘duration_good’, ‘duration_uncertain’ and ‘duration_bad’. These may be fetched as any other aggregate. It is important to note that status codes may influence how other aggregates are computed: Aggregates will in general omit the time period between a bad datapoint and the next good datapoint. Also, the period between a bad datapoint and the previous good datapoint will be considered constant. To put simply, what ‘average’ may return depends on your setting for ‘ignore_bad_datapoints’ and ‘treat_uncertain_as_bad’ (in the presence of uncertain/bad datapoints).