Skip to content

flow

openml._api.resources.flow #

FlowV1API #

FlowV1API(http: HTTPClient, minio: MinIOClient)

Bases: ResourceV1API, FlowAPI

Source code in openml/_api/resources/base/base.py
def __init__(self, http: HTTPClient, minio: MinIOClient):
    self._http = http
    self._minio = minio

delete #

delete(resource_id: int) -> bool

Delete a resource using the V1 API.

PARAMETER DESCRIPTION
resource_id

Identifier of the resource to delete.

TYPE: int

RETURNS DESCRIPTION
bool

True if the server confirms successful deletion.

RAISES DESCRIPTION
ValueError

If the resource type is not supported for deletion.

OpenMLNotAuthorizedError

If the user is not permitted to delete the resource.

OpenMLServerError

If deletion fails for an unknown reason.

OpenMLServerException

For other server-side errors.

Source code in openml/_api/resources/base/versions.py
def delete(self, resource_id: int) -> bool:
    """
    Delete a resource using the V1 API.

    Parameters
    ----------
    resource_id : int
        Identifier of the resource to delete.

    Returns
    -------
    bool
        ``True`` if the server confirms successful deletion.

    Raises
    ------
    ValueError
        If the resource type is not supported for deletion.
    OpenMLNotAuthorizedError
        If the user is not permitted to delete the resource.
    OpenMLServerError
        If deletion fails for an unknown reason.
    OpenMLServerException
        For other server-side errors.
    """
    if self.resource_type not in _LEGAL_RESOURCES_DELETE:
        raise ValueError(f"Can't delete a {self.resource_type.value}")

    endpoint_name = self._get_endpoint_name()
    path = f"{endpoint_name}/{resource_id}"
    try:
        response = self._http.delete(path)
        result = xmltodict.parse(response.content)
        return f"oml:{endpoint_name}_delete" in result
    except OpenMLServerException as e:
        self._handle_delete_exception(endpoint_name, e)
        raise

exists #

exists(name: str, external_version: str) -> int | bool

Check if a flow exists on the OpenML server.

PARAMETER DESCRIPTION
name

The name of the flow.

TYPE: str

external_version

The external version of the flow.

TYPE: str

RETURNS DESCRIPTION
int | bool

The flow ID if the flow exists, False otherwise.

Source code in openml/_api/resources/flow.py
def exists(self, name: str, external_version: str) -> int | bool:
    """Check if a flow exists on the OpenML server.

    Parameters
    ----------
    name : str
        The name of the flow.
    external_version : str
        The external version of the flow.

    Returns
    -------
    int | bool
        The flow ID if the flow exists, False otherwise.
    """
    if not (isinstance(name, str) and len(name) > 0):
        raise ValueError("Argument 'name' should be a non-empty string")
    if not (isinstance(external_version, str) and len(external_version) > 0):
        raise ValueError("Argument 'version' should be a non-empty string")

    data: dict[str, str] = {"name": name, "external_version": external_version}
    if self._http.api_key:
        data["api_key"] = self._http.api_key

    xml_response = self._http.post("flow/exists", data=data, use_api_key=False).text
    result_dict = xmltodict.parse(xml_response)
    # Detect error payloads and raise
    if "oml:error" in result_dict:
        err = result_dict["oml:error"]
        code = int(err.get("oml:code", 0)) if "oml:code" in err else None
        message = err.get("oml:message", "Server returned an error")
        raise OpenMLServerException(message=message, code=code)

    flow_id = int(result_dict["oml:flow_exists"]["oml:id"])
    return flow_id if flow_id > 0 else False

get #

get(flow_id: int, *, reset_cache: bool = False) -> OpenMLFlow

Get a flow from the OpenML server.

PARAMETER DESCRIPTION
flow_id

The ID of the flow to retrieve.

TYPE: int

reset_cache

Whether to reset the cache for this request.

TYPE: (bool, optional(default=False)) DEFAULT: False

RETURNS DESCRIPTION
OpenMLFlow

The retrieved flow object.

Source code in openml/_api/resources/flow.py
def get(
    self,
    flow_id: int,
    *,
    reset_cache: bool = False,
) -> OpenMLFlow:
    """Get a flow from the OpenML server.

    Parameters
    ----------
    flow_id : int
        The ID of the flow to retrieve.
    reset_cache : bool, optional (default=False)
        Whether to reset the cache for this request.

    Returns
    -------
    OpenMLFlow
        The retrieved flow object.
    """
    response = self._http.get(
        f"flow/{flow_id}",
        enable_cache=True,
        refresh_cache=reset_cache,
    )
    flow_xml = response.text
    return OpenMLFlow._from_dict(xmltodict.parse(flow_xml))

list #

list(limit: int | None = None, offset: int | None = None, tag: str | None = None, uploader: str | None = None) -> DataFrame

List flows on the OpenML server.

PARAMETER DESCRIPTION
limit

The maximum number of flows to return. By default, all flows are returned.

TYPE: int DEFAULT: None

offset

The number of flows to skip before starting to collect the result set. By default, no flows are skipped.

TYPE: int DEFAULT: None

tag

The tag to filter flows by. By default, no tag filtering is applied.

TYPE: str DEFAULT: None

uploader

The user to filter flows by. By default, no user filtering is applied.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame

A DataFrame containing the list of flows.

Source code in openml/_api/resources/flow.py
def list(
    self,
    limit: int | None = None,
    offset: int | None = None,
    tag: str | None = None,
    uploader: str | None = None,
) -> pd.DataFrame:
    """List flows on the OpenML server.

    Parameters
    ----------
    limit : int, optional
        The maximum number of flows to return.
        By default, all flows are returned.
    offset : int, optional
        The number of flows to skip before starting to collect the result set.
        By default, no flows are skipped.
    tag : str, optional
        The tag to filter flows by.
        By default, no tag filtering is applied.
    uploader : str, optional
        The user to filter flows by.
        By default, no user filtering is applied.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the list of flows.
    """
    api_call = "flow/list"
    if limit is not None:
        api_call += f"/limit/{limit}"
    if offset is not None:
        api_call += f"/offset/{offset}"
    if tag is not None:
        api_call += f"/tag/{quote(str(tag), safe='')}"
    if uploader is not None:
        api_call += f"/uploader/{quote(str(uploader), safe='')}"

    response = self._http.get(api_call)
    xml_string = response.text
    flows_dict = xmltodict.parse(xml_string, force_list=("oml:flow",))

    if "oml:error" in flows_dict:
        err = flows_dict["oml:error"]
        code = int(err.get("oml:code", 0)) if "oml:code" in err else None
        message = err.get("oml:message", "Server returned an error")
        raise OpenMLServerException(message=message, code=code)

    assert isinstance(flows_dict["oml:flows"]["oml:flow"], list), type(flows_dict["oml:flows"])
    assert flows_dict["oml:flows"]["@xmlns:oml"] == "http://openml.org/openml", flows_dict[
        "oml:flows"
    ]["@xmlns:oml"]

    flows: dict[int, dict[str, Any]] = {}
    for flow_ in flows_dict["oml:flows"]["oml:flow"]:
        fid = int(flow_["oml:id"])
        flow_row = {
            "id": fid,
            "full_name": flow_["oml:full_name"],
            "name": flow_["oml:name"],
            "version": flow_["oml:version"],
            "external_version": flow_["oml:external_version"],
            "uploader": flow_["oml:uploader"],
        }
        flows[fid] = flow_row

    return pd.DataFrame.from_dict(flows, orient="index")

publish #

publish(path: str, files: Mapping[str, Any] | None) -> int

Publish a new resource using the V1 API.

PARAMETER DESCRIPTION
path

API endpoint path for the upload.

TYPE: str

files

Files to upload as part of the request payload.

TYPE: Mapping of str to Any or None

RETURNS DESCRIPTION
int

Identifier of the newly created resource.

RAISES DESCRIPTION
ValueError

If the server response does not contain a valid resource ID.

OpenMLServerException

If the server returns an error during upload.

Source code in openml/_api/resources/base/versions.py
def publish(self, path: str, files: Mapping[str, Any] | None) -> int:
    """
    Publish a new resource using the V1 API.

    Parameters
    ----------
    path : str
        API endpoint path for the upload.
    files : Mapping of str to Any or None
        Files to upload as part of the request payload.

    Returns
    -------
    int
        Identifier of the newly created resource.

    Raises
    ------
    ValueError
        If the server response does not contain a valid resource ID.
    OpenMLServerException
        If the server returns an error during upload.
    """
    response = self._http.post(path, files=files)
    parsed_response = xmltodict.parse(response.content)
    return self._extract_id_from_upload(parsed_response)

tag #

tag(resource_id: int, tag: str) -> list[str]

Add a tag to a resource using the V1 API.

PARAMETER DESCRIPTION
resource_id

Identifier of the resource to tag.

TYPE: int

tag

Tag to associate with the resource.

TYPE: str

RETURNS DESCRIPTION
list of str

Updated list of tags assigned to the resource.

RAISES DESCRIPTION
ValueError

If the resource type does not support tagging.

OpenMLServerException

If the server returns an error.

Source code in openml/_api/resources/base/versions.py
def tag(self, resource_id: int, tag: str) -> list[str]:
    """
    Add a tag to a resource using the V1 API.

    Parameters
    ----------
    resource_id : int
        Identifier of the resource to tag.
    tag : str
        Tag to associate with the resource.

    Returns
    -------
    list of str
        Updated list of tags assigned to the resource.

    Raises
    ------
    ValueError
        If the resource type does not support tagging.
    OpenMLServerException
        If the server returns an error.
    """
    if self.resource_type not in _LEGAL_RESOURCES_TAG:
        raise ValueError(f"Can't tag a {self.resource_type.value}")

    endpoint_name = self._get_endpoint_name()
    path = f"{endpoint_name}/tag"
    data = {f"{endpoint_name}_id": resource_id, "tag": tag}
    response = self._http.post(path, data=data)

    parsed_response = xmltodict.parse(response.content, force_list={"oml:tag"})
    result = parsed_response[f"oml:{endpoint_name}_tag"]
    tags: list[str] = result.get("oml:tag", [])

    return tags

untag #

untag(resource_id: int, tag: str) -> list[str]

Remove a tag from a resource using the V1 API.

PARAMETER DESCRIPTION
resource_id

Identifier of the resource to untag.

TYPE: int

tag

Tag to remove from the resource.

TYPE: str

RETURNS DESCRIPTION
list of str

Updated list of tags assigned to the resource.

RAISES DESCRIPTION
ValueError

If the resource type does not support tagging.

OpenMLServerException

If the server returns an error.

Source code in openml/_api/resources/base/versions.py
def untag(self, resource_id: int, tag: str) -> list[str]:
    """
    Remove a tag from a resource using the V1 API.

    Parameters
    ----------
    resource_id : int
        Identifier of the resource to untag.
    tag : str
        Tag to remove from the resource.

    Returns
    -------
    list of str
        Updated list of tags assigned to the resource.

    Raises
    ------
    ValueError
        If the resource type does not support tagging.
    OpenMLServerException
        If the server returns an error.
    """
    if self.resource_type not in _LEGAL_RESOURCES_TAG:
        raise ValueError(f"Can't untag a {self.resource_type.value}")

    endpoint_name = self._get_endpoint_name()
    path = f"{endpoint_name}/untag"
    data = {f"{endpoint_name}_id": resource_id, "tag": tag}
    response = self._http.post(path, data=data)

    parsed_response = xmltodict.parse(response.content, force_list={"oml:tag"})
    result = parsed_response[f"oml:{endpoint_name}_untag"]
    tags: list[str] = result.get("oml:tag", [])

    return tags

FlowV2API #

FlowV2API(http: HTTPClient, minio: MinIOClient)

Bases: ResourceV2API, FlowAPI

Source code in openml/_api/resources/base/base.py
def __init__(self, http: HTTPClient, minio: MinIOClient):
    self._http = http
    self._minio = minio

exists #

exists(name: str, external_version: str) -> int | bool

Check if a flow exists on the OpenML v2 server.

PARAMETER DESCRIPTION
name

The name of the flow.

TYPE: str

external_version

The external version of the flow.

TYPE: str

RETURNS DESCRIPTION
int | bool

The flow ID if the flow exists, False otherwise.

Source code in openml/_api/resources/flow.py
def exists(self, name: str, external_version: str) -> int | bool:
    """Check if a flow exists on the OpenML v2 server.

    Parameters
    ----------
    name : str
        The name of the flow.
    external_version : str
        The external version of the flow.

    Returns
    -------
    int | bool
        The flow ID if the flow exists, False otherwise.
    """
    if not (isinstance(name, str) and len(name) > 0):
        raise ValueError("Argument 'name' should be a non-empty string")
    if not (isinstance(external_version, str) and len(external_version) > 0):
        raise ValueError("Argument 'version' should be a non-empty string")

    name_path = quote(name, safe="")
    version_path = quote(external_version, safe="")

    try:
        response = self._http.get(f"flows/exists/{name_path}/{version_path}/")
        result = response.json()
        flow_id: int | bool = result.get("flow_id", False)
        return flow_id
    except OpenMLServerNoResult:
        return False
    except OpenMLServerException as err:
        if err.code == 404:
            return False
        raise

get #

get(flow_id: int, *, reset_cache: bool = False) -> OpenMLFlow

Get a flow from the OpenML v2 server.

PARAMETER DESCRIPTION
flow_id

The ID of the flow to retrieve.

TYPE: int

reset_cache

Whether to reset the cache for this request.

TYPE: (bool, optional(default=False)) DEFAULT: False

RETURNS DESCRIPTION
OpenMLFlow

The retrieved flow object.

Source code in openml/_api/resources/flow.py
def get(
    self,
    flow_id: int,
    *,
    reset_cache: bool = False,
) -> OpenMLFlow:
    """Get a flow from the OpenML v2 server.

    Parameters
    ----------
    flow_id : int
        The ID of the flow to retrieve.
    reset_cache : bool, optional (default=False)
        Whether to reset the cache for this request.

    Returns
    -------
    OpenMLFlow
        The retrieved flow object.
    """
    response = self._http.get(
        f"flows/{flow_id}/",
        enable_cache=True,
        refresh_cache=reset_cache,
    )
    flow_json = response.json()

    # Convert v2 JSON to v1-compatible dict for OpenMLFlow._from_dict()
    flow_dict = self._convert_v2_to_v1_format(flow_json)
    return OpenMLFlow._from_dict(flow_dict)