Data Workflows

Warning

Data Workflows is a new feature:
  • The API specification is in beta.

  • The SDK implementation is in alpha.

Thus, breaking changes may occur without further notice, see Alpha and Beta Features for more information.

Workflows

Upsert Workflow

WorkflowAPI.upsert(workflow: WorkflowUpsert, mode: Literal['replace'] = 'replace') Workflow

Create a workflow.

Note this is an upsert endpoint, so if a workflow with the same external id already exists, it will be updated.

Parameters
  • workflow (WorkflowUpsert) – The workflow to create or update.

  • mode (Literal['replace']) – This is not an option for the API, but is included here to document that the upserts are always done in replace mode.

Returns

The created workflow.

Return type

Workflow

Examples

Create workflow my workflow:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowUpsert
>>> client = CogniteClient()
>>> res = client.workflows.upsert(WorkflowUpsert(external_id="my workflow", description="my workflow description"))

Delete Workflow(s)

WorkflowAPI.delete(external_id: str | SequenceNotStr[str], ignore_unknown_ids: bool = False) None

Delete one or more workflows with versions.

Parameters
  • external_id (str | SequenceNotStr[str]) – External id or list of external ids to delete.

  • ignore_unknown_ids (bool) – Ignore external ids that are not found rather than throw an exception.

Examples

Delete workflow my workflow:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.workflows.delete("my workflow")

Retrieve Workflow

WorkflowAPI.retrieve(external_id: str) Workflow | None

Retrieve a workflow.

Parameters

external_id (str) – Identifier for a Workflow. Must be unique for the project.

Returns

The requested workflow if it exists, None otherwise.

Return type

Workflow | None

Examples

Retrieve workflow my workflow:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.retrieve("my workflow")

List Workflows

WorkflowAPI.list(limit: int = 25) WorkflowList

List all workflows in the project.

Parameters

limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None

Returns

All workflows in the CDF project.

Return type

WorkflowList

Examples

List all workflows:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.list()

Workflow Versions

Upsert Workflow Version

WorkflowVersionAPI.upsert(version: WorkflowVersionUpsert, mode: Literal['replace'] = 'replace') WorkflowVersion

Create a workflow version.

Note this is an upsert endpoint, so if a workflow with the same version external id already exists, it will be updated.

Furthermore, if the workflow does not exist, it will be created.

Parameters
  • version (WorkflowVersionUpsert) – The workflow version to create or update.

  • mode (Literal['replace']) – This is not an option for the API, but is included here to document that the upserts are always done in replace mode.

Returns

The created workflow version.

Return type

WorkflowVersion

Examples

Create workflow version with one Function task:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowVersionUpsert, WorkflowDefinitionUpsert, WorkflowTask, FunctionTaskParameters
>>> client = CogniteClient()
>>> new_version = WorkflowVersionUpsert(
...    workflow_external_id="my_workflow",
...    version="1",
...    workflow_definition=WorkflowDefinitionUpsert(
...        tasks=[
...            WorkflowTask(
...                external_id="my_workflow-task1",
...                parameters=FunctionTaskParameters(
...                    external_id="cdf_deployed_function:my_function",
...                    data={"a": 1, "b": 2},
...                ),
...            )
...        ],
...        description="This workflow has one step",
...    ),
... )
>>> res = client.workflows.versions.upsert(new_version)

Delete Workflow Version(s)

WorkflowVersionAPI.delete(workflow_version_id: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier], ignore_unknown_ids: bool = False) None

Delete a workflow version(s).

Parameters
  • workflow_version_id (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier]) – Workflow version id or list of workflow version ids to delete.

  • ignore_unknown_ids (bool) – Ignore external ids that are not found rather than throw an exception.

Examples

Delete workflow version “1” of workflow “my workflow” specified by using a tuple:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.workflows.versions.delete(("my workflow", "1"))

Delete workflow version “1” of workflow “my workflow” and workflow version “2” of workflow “my workflow 2” using the WorkflowVersionId class:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowVersionId
>>> client = CogniteClient()
>>> client.workflows.versions.delete([WorkflowVersionId("my workflow", "1"), WorkflowVersionId("my workflow 2", "2")])

Retrieve Workflow Version

WorkflowVersionAPI.retrieve(workflow_external_id: str, version: str) WorkflowVersion | None

Retrieve a workflow version.

Parameters
  • workflow_external_id (str) – External id of the workflow.

  • version (str) – Version of the workflow.

Returns

The requested workflow version if it exists, None otherwise.

Return type

WorkflowVersion | None

Examples

Retrieve workflow version 1 of workflow my workflow:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.versions.retrieve("my workflow", "1")

List Workflow Versions

WorkflowVersionAPI.list(workflow_version_ids: WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None = None, limit: int = 25) WorkflowVersionList

List workflow versions in the project

Parameters
  • workflow_version_ids (WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None) – Workflow version id or list of workflow version ids to filter on.

  • limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None

Returns

The requested workflow versions.

Return type

WorkflowVersionList

Examples

Get all workflow version for workflows ‘my_workflow’ and ‘my_workflow_2’:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.versions.list(["my_workflow", "my_workflow_2"])

Get all workflow versions for workflows ‘my_workflow’ and ‘my_workflow_2’ using the WorkflowVersionId class:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowVersionId
>>> client = CogniteClient()
>>> res = client.workflows.versions.list([WorkflowVersionId("my_workflow"), WorkflowVersionId("my_workflow_2")])

Get all workflow versions for workflows ‘my_workflow’ version ‘1’ and ‘my_workflow_2’ version ‘2’ using tuples:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.versions.list([("my_workflow", "1"), ("my_workflow_2", "2")])

Workflow Executions

List Workflow Executions

WorkflowExecutionAPI.list(workflow_version_ids: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None = None, created_time_start: int | None = None, created_time_end: int | None = None, limit: int = 25) WorkflowExecutionList

List workflow executions in the project.

Parameters
  • workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None) – Workflow version id or list of workflow version ids to filter on.

  • created_time_start (int | None) – Filter out executions that was created before this time. Time is in milliseconds since epoch.

  • created_time_end (int | None) – Filter out executions that was created after this time. Time is in milliseconds since epoch.

  • limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns

The requested workflow executions.

Return type

WorkflowExecutionList

Examples

Get all workflow executions for workflows ‘my_workflow’ version ‘1’:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.executions.list(("my_workflow", "1"))

Get all workflow executions for workflows after last 24 hours:

>>> from cognite.client import CogniteClient
>>> from datetime import datetime, timedelta
>>> client = CogniteClient()
>>> res = client.workflows.executions.list(created_time_start=int((datetime.now() - timedelta(days=1)).timestamp() * 1000))

Retrieve Detailed Workflow Execution

WorkflowExecutionAPI.retrieve_detailed(id: str) WorkflowExecutionDetailed | None

Retrieve a workflow execution with detailed information.

Parameters

id (str) – The server-generated id of the workflow execution.

Returns

The requested workflow execution if it exists, None otherwise.

Return type

WorkflowExecutionDetailed | None

Examples

Retrieve workflow execution with UUID ‘000560bc-9080-4286-b242-a27bb4819253’:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.executions.retrieve_detailed("000560bc-9080-4286-b242-a27bb4819253")

List workflow executions and retrieve detailed information for the first one:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.executions.list()
>>> res = client.workflows.executions.retrieve_detailed(res[0].id)

Trigger Workflow Execution

WorkflowExecutionAPI.trigger(workflow_external_id: str, version: str, input: dict | None = None, metadata: dict | None = None, client_credentials: ClientCredentials | None = None) WorkflowExecution

Trigger a workflow execution.

Parameters
  • workflow_external_id (str) – External id of the workflow.

  • version (str) – Version of the workflow.

  • input (dict | None) – The input to the workflow execution. This will be available for tasks that have specified it as an input with the string “${workflow.input}” See tip below for more information.

  • metadata (dict | None) – Application specific metadata. Keys have a maximum length of 32 characters, values a maximum of 255, and there can be a maximum of 10 key-value pairs.

  • client_credentials (ClientCredentials | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.

Tip

The workflow input can be available in the workflow tasks. For example, if you have a Task with function parameters then you can specify it as follows

>>> from cognite.client.data_classes import WorkflowTask, FunctionTaskParameters
>>> task = WorkflowTask(
...     external_id="my_workflow-task1",
...     parameters=FunctionTaskParameters(
...         external_id="cdf_deployed_function:my_function",
...         data={"workflow_data": "${workflow.input}",}))
Returns

The created workflow execution.

Return type

WorkflowExecution

Examples

Trigger a workflow execution for the workflow “foo”, version 1:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.executions.trigger("foo", "1")

Trigger a workflow execution with input data:

>>> res = client.workflows.executions.trigger("foo", "1", input={"a": 1, "b": 2})

Trigger a workflow execution using a specific set of client credentials (i.e. not your current credentials):

>>> import os
>>> from cognite.client.data_classes import ClientCredentials
>>> credentials = ClientCredentials("my-client-id", os.environ["MY_CLIENT_SECRET"])
>>> res = client.workflows.executions.trigger("foo", "1", client_credentials=credentials)

Cancel Workflow Execution

WorkflowExecutionAPI.cancel(executions: Sequence[CancelExecution]) Sequence[WorkflowExecution]

cancel a workflow execution.

Parameters

executions (Sequence[CancelExecution]) – List of executions to cancel.

Tip

Cancelling a workflow only prevents it from starting new tasks, tasks already running on other services (transformations and functions) will keep running there.

Returns

The canceled workflow executions.

Return type

Sequence[WorkflowExecution]

Examples

Trigger a workflow execution for the workflow “foo”, version 1 and cancel it:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import CancelExecution
>>> client = CogniteClient()
>>> res = client.workflows.executions.trigger("foo", "1")
>>> client.workflows.executions.cancel([CancelExecution(res.id, "test cancelation")])

Retry Workflow Execution

WorkflowExecutionAPI.retry(id: str, client_credentials: ClientCredentials | None = None) WorkflowExecution

Retry a workflow execution.

Parameters
  • id (str) – The server-generated id of the workflow execution.

  • client_credentials (ClientCredentials | None) – Specific credentials that should be used to retry the workflow execution. When passed will take precedence over the current credentials.

Returns

The retried workflow execution.

Return type

WorkflowExecution

Examples

Retry a workflow execution that has been cancelled or failed:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import CancelExecution
>>> client = CogniteClient()
>>> res = client.workflows.executions.trigger("foo", "1")
>>> client.workflows.executions.cancel([CancelExecution(res.id, "test cancellation")])
>>> client.workflows.executions.retry(res.id)

Workflow Tasks

Update Status of Async Task

WorkflowTaskAPI.update(task_id: str, status: Literal['completed', 'failed'], output: dict | None = None) WorkflowTaskExecution

Update status of async task.

For tasks that has been marked with ‘is_async = True’, the status must be updated by calling this endpoint with either ‘completed’ or ‘failed’.

Parameters
  • task_id (str) – The server-generated id of the task.

  • status (Literal["completed", "failed"]) – The new status of the task. Must be either ‘completed’ or ‘failed’.

  • output (dict | None) – The output of the task. This will be available for tasks that has specified it as an output with the string “${<taskExternalId>.output}”

Returns

The updated task execution.

Return type

WorkflowTaskExecution

Examples

Update task with UUID ‘000560bc-9080-4286-b242-a27bb4819253’ to status ‘completed’:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "completed")

Update task with UUID ‘000560bc-9080-4286-b242-a27bb4819253’ to status ‘failed’ with output ‘{“a”: 1, “b”: 2}’:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "failed", output={"a": 1, "b": 2})

Trigger workflow, retrieve detailed task execution and update status of the second task (assumed to be async) to ‘completed’:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.workflows.executions.trigger("my workflow", "1")
>>> res = client.workflows.executions.retrieve_detailed(res.id)
>>> res = client.workflows.tasks.update(res.tasks[1].id, "completed")

Data Workflows data classes

class cognite.client.data_classes.workflows.CDFTaskOutput(response: str | dict | None, status_code: int | None)

Bases: WorkflowTaskOutput

The CDF Request output is used to specify the output of a CDF Request.

Parameters
  • response (str | dict | None) – The response of the CDF Request. Will be a JSON object if content-type is application/json, otherwise will be a string.

  • status_code (int | None) – The status code of the CDF Request.

class cognite.client.data_classes.workflows.CDFTaskParameters(resource_path: str, method: Literal['GET', 'POST', 'PUT', 'DELETE'] | str, query_parameters: dict | str | None = None, body: dict | str | None = None, request_timeout_in_millis: int | str = 10000)

Bases: WorkflowTaskParameters

The CDF request parameters are used to specify a request to the Cognite Data Fusion API.

Parameters
  • resource_path (str) – The resource path of the request. Note the path of the request which is prefixed by ‘{cluster}.cognitedata.com/api/v1/project/{project}’ based on the cluster and project of the request.

  • method (Literal["GET", "POST", "PUT", "DELETE"] | str) – The HTTP method of the request.

  • query_parameters (dict | str | None) – The query parameters of the request. Defaults to None.

  • body (dict | str | None) – The body of the request. Defaults to None. Limited to 1024KiB in size

  • request_timeout_in_millis (int | str) – The timeout of the request in milliseconds. Defaults to 10000.

Examples

Call the asset/list endpoint with a limit of 10:

>>> from cognite.client.data_classes import WorkflowTask, CDFTaskParameters
>>> task = WorkflowTask(
...     external_id="task1",
...     parameters=CDFTaskParameters(
...         resource_path="/assets/list",
...         method="GET",
...         query_parameters={"limit": 10},
...     ),
... )
dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.CancelExecution(id: str, reason: str | None = None)

Bases: object

This class represents a Workflow Version Identifier.

Parameters
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str, optional) – The version of the workflow. Defaults to None.

class cognite.client.data_classes.workflows.DynamicTaskOutput

Bases: WorkflowTaskOutput

The dynamic task output is used to specify the output of a dynamic task.

class cognite.client.data_classes.workflows.DynamicTaskParameters(tasks: list[WorkflowTask] | str)

Bases: WorkflowTaskParameters

The dynamic task parameters are used to specify a dynamic task.

When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter which is a Reference to an array of function, transformation, and cdf task definitions. This array should be generated and returned by a previous step in the workflow, for instance, a Cognite Function task.

Tip

You can reference data from other tasks or the workflow. You do this by following the format ${prefix.jsonPath} in the expression. Some valid option are:

  • ${workflow.input}: The workflow input.

  • ${<taskExternalId>.output}: The output of the task with the given external id.

  • ${<taskExternalId>.input}: The input of the task with the given external id.

  • ${<taskExternalId>.input.someKey}: A specific key within the input of the task with the given external id.

Parameters

tasks (list[WorkflowTask] | str) – The tasks to be dynamically executed. The dynamic task is a string that is evaluated during the workflow’s execution. When calling Version Upsert, the tasks parameter must be a Reference string. When calling Execution details, the tasks parameter will be a list of WorkflowTask objects.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.FunctionTaskOutput(call_id: int | None, function_id: int | None, response: dict | None)

Bases: WorkflowTaskOutput

The class represent the output of Cognite Function task.

Parameters
  • call_id (int | None) – The call_id of the CDF Function call.

  • function_id (int | None) – The function_id of the CDF Function.

  • response (dict | None) – The response of the CDF Function call.

class cognite.client.data_classes.workflows.FunctionTaskParameters(external_id: str, data: dict | str | None = None, is_async_complete: bool | None = None)

Bases: WorkflowTaskParameters

The function parameters are used to specify the Cognite Function to be called.

Parameters
  • external_id (str) – The external ID of the function to be called.

  • data (dict | str | None) – The data to be passed to the function. Defaults to None. The data can be used to specify the input to the function from previous tasks or the workflow input. See the tip below for more information.

  • is_async_complete (bool | None) – Whether the function is asynchronous. Defaults to None, which the API will interpret as False.

If a function is asynchronous, you need to call the client.workflows.tasks.update() endpoint to update the status of the task. While synchronous tasks update the status automatically.

Tip

You can dynamically specify data from other tasks or the workflow. You do this by following the format ${prefix.jsonPath} in the expression. The valid are:

  • ${workflow.input}: The workflow input.

  • ${<taskExternalId>.output}: The output of the task with the given external id.

  • ${<taskExternalId>.input}: The input of the task with the given external id.

  • ${<taskExternalId>.input.someKey}: A specific key within the input of the task with the given external id.

For example, if you have a workflow containing two tasks, and the external_id of the first task is task1 then, you can specify the data for the second task as follows:

>>> from cognite.client.data_classes  import WorkflowTask, FunctionTaskParameters
>>> task = WorkflowTask(
...     external_id="task2",
...     parameters=FunctionTaskParameters(
...         external_id="cdf_deployed_function",
...         data={
...             "workflow_data": "${workflow.input}",
...             "task1_input": "${task1.input}",
...             "task1_output": "${task1.output}"
...         },
...     ),
... )
dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.SubworkflowTaskOutput

Bases: WorkflowTaskOutput

The subworkflow task output is used to specify the output of a subworkflow task.

class cognite.client.data_classes.workflows.SubworkflowTaskParameters(tasks: list[WorkflowTask])

Bases: WorkflowTaskParameters

The subworkflow task parameters are used to specify a subworkflow task.

When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for conveniece. It takes the tasks parameter which is an array of function, transformation, and cdf task definitions. This array needs to be statically set on the worklow definition (if it needs to be defined at runtime, use a dynamic task).

Parameters

tasks (list[WorkflowTask]) – The tasks belonging to the subworkflow.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.TransformationTaskOutput(job_id: int | None)

Bases: WorkflowTaskOutput

The transformation output is used to specify the output of a transformation task.

Parameters

job_id (int | None) – The job id of the transformation job.

class cognite.client.data_classes.workflows.TransformationTaskParameters(external_id: str, concurrency_policy: Literal['fail', 'restartAfterCurrent', 'waitForCurrent'] = 'fail')

Bases: WorkflowTaskParameters

The transformation parameters are used to specify the transformation to be called.

Parameters
  • external_id (str) – The external ID of the transformation to be called.

  • concurrency_policy (Literal["fail", "restartAfterCurrent", "waitForCurrent"]) –

    Determines the behavior of the task if the Transformation is already running.

    • fail: The task fails if another instance of the Transformation is currently running.

    • waitForCurrent: The task will pause and wait for the already running Transformation to complete. Once completed, the task is completed. This mode is useful for preventing redundant Transformation runs.

    • restartAfterCurrent: The task waits for the ongoing Transformation to finish. After completion, the task restarts the Transformation. This mode ensures that the most recent data can be used by following tasks.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.Workflow(external_id: str, created_time: int, description: str | None = None)

Bases: WorkflowCore

This class represents a workflow. This is the reading version, used when reading or listing a workflows.

Parameters
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • created_time (int) – The time when the workflow was created. Unix timestamp in milliseconds.

  • description (str | None) – Description of the workflow. Defaults to None.

as_write() WorkflowUpsert

Returns this workflow in the writing format.

class cognite.client.data_classes.workflows.WorkflowCore(external_id: str, description: str | None)

Bases: WriteableCogniteResource[WorkflowUpsert], ABC

class cognite.client.data_classes.workflows.WorkflowDefinition(hash_: str, tasks: list[WorkflowTask], description: str | None = None)

Bases: WorkflowDefinitionCore

This class represents a workflow definition. This represents the read version of a workflow definiton.

A workflow definition defines the tasks and order/dependencies of these tasks.

Parameters
  • hash (str) – The hash of the tasks and description. This is used to uniquely identify the workflow definition as you can overwrite a workflow version.

  • tasks (list[WorkflowTask]) – The tasks of the workflow definition.

  • description (str | None) – The description of the workflow definition. Defaults to None.

as_write() WorkflowDefinitionUpsert

Returns this WorkflowDefinition in its write format.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowDefinitionCore(tasks: list[WorkflowTask], description: str | None)

Bases: WriteableCogniteResource[WorkflowDefinitionUpsert], ABC

This class represents a workflow definition.

A workflow definition defines the tasks and order/dependencies of these tasks.

Parameters
  • tasks (list[WorkflowTask]) – The tasks of the workflow definition.

  • description (str | None) – The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the wokflow definition already has a description, and you want to keep it, you need to provide the description when updating it.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowDefinitionUpsert(tasks: list[WorkflowTask], description: str | None)

Bases: WorkflowDefinitionCore

This class represents a workflow definition. This represents the write/update version of a workflow definiton.

A workflow definition defines the tasks and order/dependencies of these tasks.

Parameters
  • tasks (list[WorkflowTask]) – The tasks of the workflow definition.

  • description (str | None) – The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the wokflow definition already has a description, and you want to keep it, you need to provide the description when updating it.

as_write() WorkflowDefinitionUpsert

Returns this WorkflowDefinitionUpsert in its write format.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowExecution(id: str, workflow_external_id: str, status: Literal['running', 'completed', 'failed', 'timed_out', 'terminated', 'paused'], created_time: int, version: str | None = None, start_time: int | None = None, end_time: int | None = None, reason_for_incompletion: str | None = None, metadata: dict | None = None)

Bases: CogniteResource

This class represents a workflow execution.

Parameters
  • id (str) – The server generated id of the workflow execution.

  • workflow_external_id (str) – The external ID of the workflow.

  • status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]) – The status of the workflow execution.

  • created_time (int) – The time when the workflow execution was created. Unix timestamp in milliseconds.

  • version (str | None) – The version of the workflow. Defaults to None.

  • start_time (int | None) – The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • end_time (int | None) – The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.

  • metadata (dict | None) – Application specific metadata.

class cognite.client.data_classes.workflows.WorkflowExecutionDetailed(id: str, workflow_external_id: str, workflow_definition: WorkflowDefinition, status: Literal['running', 'completed', 'failed', 'timed_out', 'terminated', 'paused'], executed_tasks: list[WorkflowTaskExecution], created_time: int, version: str | None = None, start_time: int | None = None, end_time: int | None = None, reason_for_incompletion: str | None = None, input: dict | None = None, metadata: dict | None = None)

Bases: WorkflowExecution

This class represents a detailed workflow execution.

A detailed workflow execution contains the input and output of each task in the workflow execution. In addition, it contains the workflow definition of the workflow.

Parameters
  • id (str) – The server generated id of the workflow execution.

  • workflow_external_id (str) – The external ID of the workflow.

  • workflow_definition (WorkflowDefinition) – The workflow definition of the workflow.

  • status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]) – The status of the workflow execution.

  • executed_tasks (list[WorkflowTaskExecution]) – The executed tasks of the workflow execution.

  • created_time (int) – The time when the workflow execution was created. Unix timestamp in milliseconds.

  • version (str | None) – The version of the workflow. Defaults to None.

  • start_time (int | None) – The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • end_time (int | None) – The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.

  • input (dict | None) – Input arguments the workflow was triggered with.

  • metadata (dict | None) – Metadata set when the workflow was triggered.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowExecutionList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[WorkflowExecution]

This class represents a list of workflow executions.

class cognite.client.data_classes.workflows.WorkflowIds(workflow_ids: Collection[WorkflowVersionId])

Bases: UserList

This class represents a list of Workflow Version Identifiers.

class cognite.client.data_classes.workflows.WorkflowList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: WriteableCogniteResourceList[WorkflowUpsert, Workflow], ExternalIDTransformerMixin

This class represents a list of workflows.

as_write() WorkflowUpsertList

Returns these workflows in the writing format.

class cognite.client.data_classes.workflows.WorkflowTask(external_id: str, parameters: WorkflowTaskParameters, name: str | None = None, description: str | None = None, retries: int = 3, timeout: int = 3600, on_failure: Literal['abortWorkflow', 'skipTask'] = 'abortWorkflow', depends_on: list[str] | None = None)

Bases: CogniteResource

This class represents a workflow task.

Note: tasks do not distinguish between write and read versions.

Parameters
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • parameters (WorkflowTaskParameters) – The parameters of the task.

  • name (str | None) – The name of the task. Defaults to None.

  • description (str | None) – The description of the task. Defaults to None.

  • retries (int) – The number of retries for the task. Defaults to 3.

  • timeout (int) – The timeout of the task in seconds. Defaults to 3600.

  • on_failure (Literal["abortWorkflow", "skipTask"]) –

    The policy to handle failures and timeouts. Defaults to abortWorkflow.

    • skipTask: For both failures and timeouts, the task will retry until the retries are exhausted. After that, the Task is marked as COMPLETED_WITH_ERRORS and the subsequent tasks are executed.

    • abortWorkflow: In case of failures, retries will be performed until exhausted. After which the task is marked as FAILED and the Workflow is marked the same. In the event of a timeout, no retries are undertaken; the task is marked as TIMED_OUT and the Workflow is marked as FAILED.

  • depends_on (list[str] | None) – The external ids of the tasks that this task depends on. Defaults to None.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTaskExecution(id: str, external_id: str, status: WorkflowStatus, input: WorkflowTaskParameters, output: WorkflowTaskOutput, version: str | None = None, start_time: int | None = None, end_time: int | None = None, reason_for_incompletion: str | None = None)

Bases: CogniteObject

This class represents a task execution.

Parameters
  • id (str) – The server generated id of the task execution.

  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • status (WorkflowStatus) – The status of the task execution.

  • input (WorkflowTaskParameters) – The input parameters of the task execution.

  • output (WorkflowTaskOutput) – The output of the task execution.

  • version (str | None) – The version of the task execution. Defaults to None.

  • start_time (int | None) – The start time of the task execution. Unix timestamp in milliseconds. Defaults to None.

  • end_time (int | None) – The end time of the task execution. Unix timestamp in milliseconds. Defaults to None.

  • reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTaskOutput

Bases: ABC

class cognite.client.data_classes.workflows.WorkflowTaskParameters

Bases: CogniteObject, ABC

class cognite.client.data_classes.workflows.WorkflowUpsert(external_id: str, description: str | None)

Bases: WorkflowCore

This class represents a workflow. This is the write version, used when creating or updating a workflow.

Parameters
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • description (str | None) – Description of the workflow. Note that when updating a workflow, the description will always be overwritten also if it is set to None. Meaning if the wokflow already has a description, and you want to keep it, you need to provide the description when updating the workflow.

as_write() WorkflowUpsert

Returns this workflow instance.

class cognite.client.data_classes.workflows.WorkflowUpsertList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[WorkflowUpsert], ExternalIDTransformerMixin

class cognite.client.data_classes.workflows.WorkflowVersion(workflow_external_id: str, version: str, workflow_definition: WorkflowDefinition)

Bases: WorkflowVersionCore

This class represents a workflow version. This is the read variant, used when retrieving/listing a workflow variant.

Parameters
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str) – The version of the workflow.

  • workflow_definition (WorkflowDefinition) – The workflow definition of the workflow version.

as_write() WorkflowVersionUpsert

Returns a WorkflowVersionUpsert object with the same data.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowVersionCore(workflow_external_id: str, version: str)

Bases: WriteableCogniteResource[WorkflowVersionUpsert], ABC

This class represents a workflow version.

Parameters
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str) – The version of the workflow.

class cognite.client.data_classes.workflows.WorkflowVersionId(workflow_external_id: str, version: str | None = None)

Bases: object

This class represents a Workflow Version Identifier.

Parameters
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str, optional) – The version of the workflow. Defaults to None.

class cognite.client.data_classes.workflows.WorkflowVersionList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: WriteableCogniteResourceList[WorkflowVersionUpsert, WorkflowVersion]

This class represents a list of workflow versions.

as_ids() WorkflowIds

Returns a WorkflowIds object with the workflow version ids.

as_write() WorkflowVersionUpsertList

Returns a WorkflowVersionUpsertList object with the same data.

class cognite.client.data_classes.workflows.WorkflowVersionUpsert(workflow_external_id: str, version: str, workflow_definition: WorkflowDefinitionUpsert)

Bases: WorkflowVersionCore

This class represents a workflow version. This is the write-variant, used when creating or updating a workflow variant.

Parameters
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str) – The version of the workflow.

  • workflow_definition (WorkflowDefinitionUpsert) – The workflow definition of the workflow version.

as_write() WorkflowVersionUpsert

Returns this WorkflowVersionUpsert instance.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns

A dictionary representation of the instance.

Return type

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowVersionUpsertList(resources: Collection[Any], cognite_client: CogniteClient | None = None)

Bases: CogniteResourceList[WorkflowVersionUpsert]

This class represents a list of workflow versions.

as_ids() WorkflowIds

Returns a WorkflowIds object with the workflow version ids.