Skip to content

runs

openml.runs #

OpenMLRun #

OpenMLRun(task_id: int, flow_id: int | None, dataset_id: int | None, setup_string: str | None = None, output_files: dict[str, int] | None = None, setup_id: int | None = None, tags: list[str] | None = None, uploader: int | None = None, uploader_name: str | None = None, evaluations: dict | None = None, fold_evaluations: dict | None = None, sample_evaluations: dict | None = None, data_content: list[list] | None = None, trace: OpenMLRunTrace | None = None, model: object | None = None, task_type: str | None = None, task_evaluation_measure: str | None = None, flow_name: str | None = None, parameter_settings: list[dict[str, Any]] | None = None, predictions_url: str | None = None, task: OpenMLTask | None = None, flow: OpenMLFlow | None = None, run_id: int | None = None, description_text: str | None = None, run_details: str | None = None)

Bases: OpenMLBase

OpenML Run: result of running a model on an OpenML dataset.

Parameters#

task_id: int The ID of the OpenML task associated with the run. flow_id: int The ID of the OpenML flow associated with the run. dataset_id: int The ID of the OpenML dataset used for the run. setup_string: str The setup string of the run. output_files: Dict[str, int] Specifies where each related file can be found. setup_id: int An integer representing the ID of the setup used for the run. tags: List[str] Representing the tags associated with the run. uploader: int User ID of the uploader. uploader_name: str The name of the person who uploaded the run. evaluations: Dict Representing the evaluations of the run. fold_evaluations: Dict The evaluations of the run for each fold. sample_evaluations: Dict The evaluations of the run for each sample. data_content: List[List] The predictions generated from executing this run. trace: OpenMLRunTrace The trace containing information on internal model evaluations of this run. model: object The untrained model that was evaluated in the run. task_type: str The type of the OpenML task associated with the run. task_evaluation_measure: str The evaluation measure used for the task. flow_name: str The name of the OpenML flow associated with the run. parameter_settings: list[OrderedDict] Representing the parameter settings used for the run. predictions_url: str The URL of the predictions file. task: OpenMLTask An instance of the OpenMLTask class, representing the OpenML task associated with the run. flow: OpenMLFlow An instance of the OpenMLFlow class, representing the OpenML flow associated with the run. run_id: int The ID of the run. description_text: str, optional Description text to add to the predictions file. If left None, is set to the time the arff file is generated. run_details: str, optional (default=None) Description of the run stored in the run meta-data.

Source code in openml/runs/run.py
def __init__(  # noqa: PLR0913
    self,
    task_id: int,
    flow_id: int | None,
    dataset_id: int | None,
    setup_string: str | None = None,
    output_files: dict[str, int] | None = None,
    setup_id: int | None = None,
    tags: list[str] | None = None,
    uploader: int | None = None,
    uploader_name: str | None = None,
    evaluations: dict | None = None,
    fold_evaluations: dict | None = None,
    sample_evaluations: dict | None = None,
    data_content: list[list] | None = None,
    trace: OpenMLRunTrace | None = None,
    model: object | None = None,
    task_type: str | None = None,
    task_evaluation_measure: str | None = None,
    flow_name: str | None = None,
    parameter_settings: list[dict[str, Any]] | None = None,
    predictions_url: str | None = None,
    task: OpenMLTask | None = None,
    flow: OpenMLFlow | None = None,
    run_id: int | None = None,
    description_text: str | None = None,
    run_details: str | None = None,
):
    self.uploader = uploader
    self.uploader_name = uploader_name
    self.task_id = task_id
    self.task_type = task_type
    self.task_evaluation_measure = task_evaluation_measure
    self.flow_id = flow_id
    self.flow_name = flow_name
    self.setup_id = setup_id
    self.setup_string = setup_string
    self.parameter_settings = parameter_settings
    self.dataset_id = dataset_id
    self.evaluations = evaluations
    self.fold_evaluations = fold_evaluations
    self.sample_evaluations = sample_evaluations
    self.data_content = data_content
    self.output_files = output_files
    self.trace = trace
    self.error_message = None
    self.task = task
    self.flow = flow
    self.run_id = run_id
    self.model = model
    self.tags = tags
    self.predictions_url = predictions_url
    self.description_text = description_text
    self.run_details = run_details
    self._predictions = None

id property #

id: int | None

The ID of the run, None if not uploaded to the server yet.

openml_url property #

openml_url: str | None

The URL of the object on the server, if it was uploaded, else None.

predictions property #

predictions: DataFrame

Return a DataFrame with predictions for this run

from_filesystem classmethod #

from_filesystem(directory: str | Path, expect_model: bool = True) -> OpenMLRun

The inverse of the to_filesystem method. Instantiates an OpenMLRun object based on files stored on the file system.

Parameters#

directory : str a path leading to the folder where the results are stored

bool

if True, it requires the model pickle to be present, and an error will be thrown if not. Otherwise, the model might or might not be present.

Returns#

run : OpenMLRun the re-instantiated run object

Source code in openml/runs/run.py
@classmethod
def from_filesystem(cls, directory: str | Path, expect_model: bool = True) -> OpenMLRun:  # noqa: FBT001, FBT002
    """
    The inverse of the to_filesystem method. Instantiates an OpenMLRun
    object based on files stored on the file system.

    Parameters
    ----------
    directory : str
        a path leading to the folder where the results
        are stored

    expect_model : bool
        if True, it requires the model pickle to be present, and an error
        will be thrown if not. Otherwise, the model might or might not
        be present.

    Returns
    -------
    run : OpenMLRun
        the re-instantiated run object
    """
    # Avoiding cyclic imports
    import openml.runs.functions

    directory = Path(directory)
    if not directory.is_dir():
        raise ValueError("Could not find folder")

    description_path = directory / "description.xml"
    predictions_path = directory / "predictions.arff"
    trace_path = directory / "trace.arff"
    model_path = directory / "model.pkl"

    if not description_path.is_file():
        raise ValueError("Could not find description.xml")
    if not predictions_path.is_file():
        raise ValueError("Could not find predictions.arff")
    if (not model_path.is_file()) and expect_model:
        raise ValueError("Could not find model.pkl")

    with description_path.open() as fht:
        xml_string = fht.read()
    run = openml.runs.functions._create_run_from_xml(xml_string, from_server=False)

    if run.flow_id is None:
        flow = openml.flows.OpenMLFlow.from_filesystem(directory)
        run.flow = flow
        run.flow_name = flow.name

    with predictions_path.open() as fht:
        predictions = arff.load(fht)
        run.data_content = predictions["data"]

    if model_path.is_file():
        # note that it will load the model if the file exists, even if
        # expect_model is False
        with model_path.open("rb") as fhb:
            run.model = pickle.load(fhb)  # noqa: S301

    if trace_path.is_file():
        run.trace = openml.runs.OpenMLRunTrace._from_filesystem(trace_path)

    return run

get_metric_fn #

get_metric_fn(sklearn_fn: Callable, kwargs: dict | None = None) -> ndarray

Calculates metric scores based on predicted values. Assumes the run has been executed locally (and contains run_data). Furthermore, it assumes that the 'correct' or 'truth' attribute is specified in the arff (which is an optional field, but always the case for openml-python runs)

Parameters#

sklearn_fn : function a function pointer to a sklearn function that accepts y_true, y_pred and **kwargs kwargs : dict kwargs for the function

Returns#

scores : ndarray of scores of length num_folds * num_repeats metric results

Source code in openml/runs/run.py
def get_metric_fn(self, sklearn_fn: Callable, kwargs: dict | None = None) -> np.ndarray:  # noqa: PLR0915, PLR0912, C901
    """Calculates metric scores based on predicted values. Assumes the
    run has been executed locally (and contains run_data). Furthermore,
    it assumes that the 'correct' or 'truth' attribute is specified in
    the arff (which is an optional field, but always the case for
    openml-python runs)

    Parameters
    ----------
    sklearn_fn : function
        a function pointer to a sklearn function that
        accepts ``y_true``, ``y_pred`` and ``**kwargs``
    kwargs : dict
        kwargs for the function

    Returns
    -------
    scores : ndarray of scores of length num_folds * num_repeats
        metric results
    """
    kwargs = kwargs if kwargs else {}
    if self.data_content is not None and self.task_id is not None:
        predictions_arff = self._generate_arff_dict()
    elif (self.output_files is not None) and ("predictions" in self.output_files):
        predictions_file_url = openml._api_calls._file_id_to_url(
            self.output_files["predictions"],
            "predictions.arff",
        )
        response = openml._api_calls._download_text_file(predictions_file_url)
        predictions_arff = arff.loads(response)
        # TODO: make this a stream reader
    else:
        raise ValueError(
            "Run should have been locally executed or " "contain outputfile reference.",
        )

    # Need to know more about the task to compute scores correctly
    task = get_task(self.task_id)

    attribute_names = [att[0] for att in predictions_arff["attributes"]]
    if (
        task.task_type_id in [TaskType.SUPERVISED_CLASSIFICATION, TaskType.LEARNING_CURVE]
        and "correct" not in attribute_names
    ):
        raise ValueError('Attribute "correct" should be set for ' "classification task runs")
    if task.task_type_id == TaskType.SUPERVISED_REGRESSION and "truth" not in attribute_names:
        raise ValueError('Attribute "truth" should be set for ' "regression task runs")
    if task.task_type_id != TaskType.CLUSTERING and "prediction" not in attribute_names:
        raise ValueError('Attribute "predict" should be set for ' "supervised task runs")

    def _attribute_list_to_dict(attribute_list):  # type: ignore
        # convenience function: Creates a mapping to map from the name of
        # attributes present in the arff prediction file to their index.
        # This is necessary because the number of classes can be different
        # for different tasks.
        res = OrderedDict()
        for idx in range(len(attribute_list)):
            res[attribute_list[idx][0]] = idx
        return res

    attribute_dict = _attribute_list_to_dict(predictions_arff["attributes"])

    repeat_idx = attribute_dict["repeat"]
    fold_idx = attribute_dict["fold"]
    predicted_idx = attribute_dict["prediction"]  # Assume supervised task

    if task.task_type_id in (TaskType.SUPERVISED_CLASSIFICATION, TaskType.LEARNING_CURVE):
        correct_idx = attribute_dict["correct"]
    elif task.task_type_id == TaskType.SUPERVISED_REGRESSION:
        correct_idx = attribute_dict["truth"]
    has_samples = False
    if "sample" in attribute_dict:
        sample_idx = attribute_dict["sample"]
        has_samples = True

    if (
        predictions_arff["attributes"][predicted_idx][1]
        != predictions_arff["attributes"][correct_idx][1]
    ):
        pred = predictions_arff["attributes"][predicted_idx][1]
        corr = predictions_arff["attributes"][correct_idx][1]
        raise ValueError(
            "Predicted and Correct do not have equal values:" f" {pred!s} Vs. {corr!s}",
        )

    # TODO: these could be cached
    values_predict: dict[int, dict[int, dict[int, list[float]]]] = {}
    values_correct: dict[int, dict[int, dict[int, list[float]]]] = {}
    for _line_idx, line in enumerate(predictions_arff["data"]):
        rep = line[repeat_idx]
        fold = line[fold_idx]
        samp = line[sample_idx] if has_samples else 0

        if task.task_type_id in [
            TaskType.SUPERVISED_CLASSIFICATION,
            TaskType.LEARNING_CURVE,
        ]:
            prediction = predictions_arff["attributes"][predicted_idx][1].index(
                line[predicted_idx],
            )
            correct = predictions_arff["attributes"][predicted_idx][1].index(line[correct_idx])
        elif task.task_type_id == TaskType.SUPERVISED_REGRESSION:
            prediction = line[predicted_idx]
            correct = line[correct_idx]
        if rep not in values_predict:
            values_predict[rep] = OrderedDict()
            values_correct[rep] = OrderedDict()
        if fold not in values_predict[rep]:
            values_predict[rep][fold] = OrderedDict()
            values_correct[rep][fold] = OrderedDict()
        if samp not in values_predict[rep][fold]:
            values_predict[rep][fold][samp] = []
            values_correct[rep][fold][samp] = []

        values_predict[rep][fold][samp].append(prediction)
        values_correct[rep][fold][samp].append(correct)

    scores = []
    for rep in values_predict:
        for fold in values_predict[rep]:
            last_sample = len(values_predict[rep][fold]) - 1
            y_pred = values_predict[rep][fold][last_sample]
            y_true = values_correct[rep][fold][last_sample]
            scores.append(sklearn_fn(y_true, y_pred, **kwargs))
    return np.array(scores)

open_in_browser #

open_in_browser() -> None

Opens the OpenML web page corresponding to this object in your default browser.

Source code in openml/base.py
def open_in_browser(self) -> None:
    """Opens the OpenML web page corresponding to this object in your default browser."""
    if self.openml_url is None:
        raise ValueError(
            "Cannot open element on OpenML.org when attribute `openml_url` is `None`",
        )

    webbrowser.open(self.openml_url)

publish #

publish() -> OpenMLBase

Publish the object on the OpenML server.

Source code in openml/base.py
def publish(self) -> OpenMLBase:
    """Publish the object on the OpenML server."""
    file_elements = self._get_file_elements()

    if "description" not in file_elements:
        file_elements["description"] = self._to_xml()

    call = f"{_get_rest_api_type_alias(self)}/"
    response_text = openml._api_calls._perform_api_call(
        call,
        "post",
        file_elements=file_elements,
    )
    xml_response = xmltodict.parse(response_text)

    self._parse_publish_response(xml_response)
    return self

push_tag #

push_tag(tag: str) -> None

Annotates this entity with a tag on the server.

Parameters#

tag : str Tag to attach to the flow.

Source code in openml/base.py
def push_tag(self, tag: str) -> None:
    """Annotates this entity with a tag on the server.

    Parameters
    ----------
    tag : str
        Tag to attach to the flow.
    """
    _tag_openml_base(self, tag)

remove_tag #

remove_tag(tag: str) -> None

Removes a tag from this entity on the server.

Parameters#

tag : str Tag to attach to the flow.

Source code in openml/base.py
def remove_tag(self, tag: str) -> None:
    """Removes a tag from this entity on the server.

    Parameters
    ----------
    tag : str
        Tag to attach to the flow.
    """
    _tag_openml_base(self, tag, untag=True)

to_filesystem #

to_filesystem(directory: str | Path, store_model: bool = True) -> None

The inverse of the from_filesystem method. Serializes a run on the filesystem, to be uploaded later.

Parameters#

directory : str a path leading to the folder where the results will be stored. Should be empty

bool, optional (default=True)

if True, a model will be pickled as well. As this is the most storage expensive part, it is often desirable to not store the model.

Source code in openml/runs/run.py
def to_filesystem(
    self,
    directory: str | Path,
    store_model: bool = True,  # noqa: FBT001, FBT002
) -> None:
    """
    The inverse of the from_filesystem method. Serializes a run
    on the filesystem, to be uploaded later.

    Parameters
    ----------
    directory : str
        a path leading to the folder where the results
        will be stored. Should be empty

    store_model : bool, optional (default=True)
        if True, a model will be pickled as well. As this is the most
        storage expensive part, it is often desirable to not store the
        model.
    """
    if self.data_content is None or self.model is None:
        raise ValueError("Run should have been executed (and contain " "model / predictions)")
    directory = Path(directory)
    directory.mkdir(exist_ok=True, parents=True)

    if any(directory.iterdir()):
        raise ValueError(f"Output directory {directory.expanduser().resolve()} should be empty")

    run_xml = self._to_xml()
    predictions_arff = arff.dumps(self._generate_arff_dict())

    # It seems like typing does not allow to define the same variable multiple times
    with (directory / "description.xml").open("w") as fh:
        fh.write(run_xml)
    with (directory / "predictions.arff").open("w") as fh:
        fh.write(predictions_arff)
    if store_model:
        with (directory / "model.pkl").open("wb") as fh_b:
            pickle.dump(self.model, fh_b)

    if self.flow_id is None and self.flow is not None:
        self.flow.to_filesystem(directory)

    if self.trace is not None:
        self.trace._to_filesystem(directory)

url_for_id classmethod #

url_for_id(id_: int) -> str

Return the OpenML URL for the object of the class entity with the given id.

Source code in openml/base.py
@classmethod
def url_for_id(cls, id_: int) -> str:
    """Return the OpenML URL for the object of the class entity with the given id."""
    # Sample url for a flow: openml.org/f/123
    return f"{openml.config.get_server_base_url()}/{cls._entity_letter()}/{id_}"

OpenMLRunTrace #

OpenMLRunTrace(run_id: int | None, trace_iterations: dict[tuple[int, int, int], OpenMLTraceIteration])

OpenML Run Trace: parsed output from Run Trace call

Parameters#

run_id : int OpenML run id.

dict

Mapping from key (repeat, fold, iteration) to an object of OpenMLTraceIteration.

Parameters#

run_id : int Id for which the trace content is to be stored. trace_iterations : List[List] The trace content obtained by running a flow on a task.

Source code in openml/runs/trace.py
def __init__(
    self,
    run_id: int | None,
    trace_iterations: dict[tuple[int, int, int], OpenMLTraceIteration],
):
    """Object to hold the trace content of a run.

    Parameters
    ----------
    run_id : int
        Id for which the trace content is to be stored.
    trace_iterations : List[List]
        The trace content obtained by running a flow on a task.
    """
    self.run_id = run_id
    self.trace_iterations = trace_iterations

generate classmethod #

generate(attributes: list[tuple[str, str]], content: list[list[int | float | str]]) -> OpenMLRunTrace

Generates an OpenMLRunTrace.

Generates the trace object from the attributes and content extracted while running the underlying flow.

Parameters#

attributes : list List of tuples describing the arff attributes.

list

List of lists containing information about the individual tuning runs.

Returns#

OpenMLRunTrace

Source code in openml/runs/trace.py
@classmethod
def generate(
    cls,
    attributes: list[tuple[str, str]],
    content: list[list[int | float | str]],
) -> OpenMLRunTrace:
    """Generates an OpenMLRunTrace.

    Generates the trace object from the attributes and content extracted
    while running the underlying flow.

    Parameters
    ----------
    attributes : list
        List of tuples describing the arff attributes.

    content : list
        List of lists containing information about the individual tuning
        runs.

    Returns
    -------
    OpenMLRunTrace
    """
    if content is None:
        raise ValueError("Trace content not available.")
    if attributes is None:
        raise ValueError("Trace attributes not available.")
    if len(content) == 0:
        raise ValueError("Trace content is empty.")
    if len(attributes) != len(content[0]):
        raise ValueError(
            "Trace_attributes and trace_content not compatible:"
            f" {attributes} vs {content[0]}",
        )

    return cls._trace_from_arff_struct(
        attributes=attributes,
        content=content,
        error_message="setup_string not allowed when constructing a "
        "trace object from run results.",
    )

get_selected_iteration #

get_selected_iteration(fold: int, repeat: int) -> int

Returns the trace iteration that was marked as selected. In case multiple are marked as selected (should not happen) the first of these is returned

Parameters#

fold: int

repeat: int

Returns#

int The trace iteration from the given fold and repeat that was selected as the best iteration by the search procedure

Source code in openml/runs/trace.py
def get_selected_iteration(self, fold: int, repeat: int) -> int:
    """
    Returns the trace iteration that was marked as selected. In
    case multiple are marked as selected (should not happen) the
    first of these is returned

    Parameters
    ----------
    fold: int

    repeat: int

    Returns
    -------
    int
        The trace iteration from the given fold and repeat that was
        selected as the best iteration by the search procedure
    """
    for r, f, i in self.trace_iterations:
        if r == repeat and f == fold and self.trace_iterations[(r, f, i)].selected is True:
            return i
    raise ValueError(
        "Could not find the selected iteration for rep/fold %d/%d" % (repeat, fold),
    )

merge_traces classmethod #

merge_traces(traces: list[OpenMLRunTrace]) -> OpenMLRunTrace

Merge multiple traces into a single trace.

Parameters#

cls : type Type of the trace object to be created. traces : List[OpenMLRunTrace] List of traces to merge.

Returns#

OpenMLRunTrace A trace object representing the merged traces.

Raises#

ValueError If the parameters in the iterations of the traces being merged are not equal. If a key (repeat, fold, iteration) is encountered twice while merging the traces.

Source code in openml/runs/trace.py
@classmethod
def merge_traces(cls, traces: list[OpenMLRunTrace]) -> OpenMLRunTrace:
    """Merge multiple traces into a single trace.

    Parameters
    ----------
    cls : type
        Type of the trace object to be created.
    traces : List[OpenMLRunTrace]
        List of traces to merge.

    Returns
    -------
    OpenMLRunTrace
        A trace object representing the merged traces.

    Raises
    ------
    ValueError
        If the parameters in the iterations of the traces being merged are not equal.
        If a key (repeat, fold, iteration) is encountered twice while merging the traces.
    """
    merged_trace: dict[tuple[int, int, int], OpenMLTraceIteration] = {}

    previous_iteration = None
    for trace in traces:
        for iteration in trace:
            key = (iteration.repeat, iteration.fold, iteration.iteration)

            assert iteration.parameters is not None
            param_keys = iteration.parameters.keys()

            if previous_iteration is not None:
                trace_itr = merged_trace[previous_iteration]

                assert trace_itr.parameters is not None
                trace_itr_keys = trace_itr.parameters.keys()

                if list(param_keys) != list(trace_itr_keys):
                    raise ValueError(
                        "Cannot merge traces because the parameters are not equal: "
                        f"{list(trace_itr.parameters.keys())} vs "
                        f"{list(iteration.parameters.keys())}",
                    )

            if key in merged_trace:
                raise ValueError(
                    f"Cannot merge traces because key '{key}' was encountered twice",
                )

            merged_trace[key] = iteration
            previous_iteration = key

    return cls(None, merged_trace)

trace_from_arff classmethod #

trace_from_arff(arff_obj: dict[str, Any]) -> OpenMLRunTrace

Generate trace from arff trace.

Creates a trace file from arff object (for example, generated by a local run).

Parameters#

arff_obj : dict LIAC arff obj, dict containing attributes, relation, data.

Returns#

OpenMLRunTrace

Source code in openml/runs/trace.py
@classmethod
def trace_from_arff(cls, arff_obj: dict[str, Any]) -> OpenMLRunTrace:
    """Generate trace from arff trace.

    Creates a trace file from arff object (for example, generated by a
    local run).

    Parameters
    ----------
    arff_obj : dict
        LIAC arff obj, dict containing attributes, relation, data.

    Returns
    -------
    OpenMLRunTrace
    """
    attributes = arff_obj["attributes"]
    content = arff_obj["data"]
    return cls._trace_from_arff_struct(
        attributes=attributes,
        content=content,
        error_message="setup_string not supported for arff serialization",
    )

trace_from_xml classmethod #

trace_from_xml(xml: str | Path | IO) -> OpenMLRunTrace

Generate trace from xml.

Creates a trace file from the xml description.

Parameters#

xml : string | file-like object An xml description that can be either a string or a file-like object.

Returns#

run : OpenMLRunTrace Object containing the run id and a dict containing the trace iterations.

Source code in openml/runs/trace.py
@classmethod
def trace_from_xml(cls, xml: str | Path | IO) -> OpenMLRunTrace:
    """Generate trace from xml.

    Creates a trace file from the xml description.

    Parameters
    ----------
    xml : string | file-like object
        An xml description that can be either a `string` or a file-like
        object.

    Returns
    -------
    run : OpenMLRunTrace
        Object containing the run id and a dict containing the trace
        iterations.
    """
    if isinstance(xml, Path):
        xml = str(xml.absolute())

    result_dict = xmltodict.parse(xml, force_list=("oml:trace_iteration",))["oml:trace"]

    run_id = result_dict["oml:run_id"]
    trace = OrderedDict()

    if "oml:trace_iteration" not in result_dict:
        raise ValueError("Run does not contain valid trace. ")
    if not isinstance(result_dict["oml:trace_iteration"], list):
        raise TypeError(type(result_dict["oml:trace_iteration"]))

    for itt in result_dict["oml:trace_iteration"]:
        repeat = int(itt["oml:repeat"])
        fold = int(itt["oml:fold"])
        iteration = int(itt["oml:iteration"])
        setup_string = json.loads(itt["oml:setup_string"])
        evaluation = float(itt["oml:evaluation"])
        selected_value = itt["oml:selected"]
        if selected_value == "true":
            selected = True
        elif selected_value == "false":
            selected = False
        else:
            raise ValueError(
                'expected {"true", "false"} value for '
                f"selected field, received: {selected_value}",
            )

        current = OpenMLTraceIteration(
            repeat=repeat,
            fold=fold,
            iteration=iteration,
            setup_string=setup_string,
            evaluation=evaluation,
            selected=selected,
        )
        trace[(repeat, fold, iteration)] = current

    return cls(run_id, trace)

trace_to_arff #

trace_to_arff() -> dict[str, Any]

Generate the arff dictionary for uploading predictions to the server.

Uses the trace object to generate an arff dictionary representation.

Returns#

arff_dict : dict Dictionary representation of the ARFF file that will be uploaded. Contains information about the optimization trace.

Source code in openml/runs/trace.py
def trace_to_arff(self) -> dict[str, Any]:
    """Generate the arff dictionary for uploading predictions to the server.

    Uses the trace object to generate an arff dictionary representation.

    Returns
    -------
    arff_dict : dict
        Dictionary representation of the ARFF file that will be uploaded.
        Contains information about the optimization trace.
    """
    if self.trace_iterations is None:
        raise ValueError("trace_iterations missing from the trace object")

    # attributes that will be in trace arff
    trace_attributes = [
        ("repeat", "NUMERIC"),
        ("fold", "NUMERIC"),
        ("iteration", "NUMERIC"),
        ("evaluation", "NUMERIC"),
        ("selected", ["true", "false"]),
    ]
    trace_attributes.extend(
        [
            (PREFIX + parameter, "STRING")
            for parameter in next(iter(self.trace_iterations.values())).get_parameters()
        ],
    )

    arff_dict: dict[str, Any] = {}
    data = []
    for trace_iteration in self.trace_iterations.values():
        tmp_list = []
        for _attr, _ in trace_attributes:
            if _attr.startswith(PREFIX):
                attr = _attr[len(PREFIX) :]
                value = trace_iteration.get_parameters()[attr]
            else:
                attr = _attr
                value = getattr(trace_iteration, attr)

            if attr == "selected":
                tmp_list.append("true" if value else "false")
            else:
                tmp_list.append(value)
        data.append(tmp_list)

    arff_dict["attributes"] = trace_attributes
    arff_dict["data"] = data
    # TODO allow to pass a trace description when running a flow
    arff_dict["relation"] = "Trace"
    return arff_dict

OpenMLTraceIteration dataclass #

OpenMLTraceIteration(repeat: int, fold: int, iteration: int, evaluation: float, selected: bool, setup_string: dict[str, str] | None = None, parameters: dict[str, str | int | float] | None = None)

OpenML Trace Iteration: parsed output from Run Trace call Exactly one of setup_string or parameters must be provided.

Parameters#

repeat : int repeat number (in case of no repeats: 0)

int

fold number (in case of no folds: 0)

int

iteration number of optimization procedure

str, optional

json string representing the parameters If not provided, parameters should be set.

double

The evaluation that was awarded to this trace iteration. Measure is defined by the task

bool

Whether this was the best of all iterations, and hence selected for making predictions. Per fold/repeat there should be only one iteration selected

OrderedDict, optional

Dictionary specifying parameter names and their values. If not provided, setup_string should be set.

get_parameters #

get_parameters() -> dict[str, Any]

Get the parameters of this trace iteration.

Source code in openml/runs/trace.py
def get_parameters(self) -> dict[str, Any]:
    """Get the parameters of this trace iteration."""
    # parameters have prefix 'parameter_'
    if self.setup_string:
        return {
            param[len(PREFIX) :]: json.loads(value)
            for param, value in self.setup_string.items()
        }

    assert self.parameters is not None
    return {param[len(PREFIX) :]: value for param, value in self.parameters.items()}

delete_run #

delete_run(run_id: int) -> bool

Delete run with id run_id from the OpenML server.

You can only delete runs which you uploaded.

Parameters#

run_id : int OpenML id of the run

Returns#

bool True if the deletion was successful. False otherwise.

Source code in openml/runs/functions.py
def delete_run(run_id: int) -> bool:
    """Delete run with id `run_id` from the OpenML server.

    You can only delete runs which you uploaded.

    Parameters
    ----------
    run_id : int
        OpenML id of the run

    Returns
    -------
    bool
        True if the deletion was successful. False otherwise.
    """
    return openml.utils._delete_entity("run", run_id)

get_run #

get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun

Gets run corresponding to run_id.

Parameters#

run_id : int

bool

Whether to ignore the cache. If true this will download and overwrite the run xml even if the requested run is already cached.

ignore_cache

Returns#

run : OpenMLRun Run corresponding to ID, fetched from the server.

Source code in openml/runs/functions.py
@openml.utils.thread_safe_if_oslo_installed
def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun:  # noqa: FBT002, FBT001
    """Gets run corresponding to run_id.

    Parameters
    ----------
    run_id : int

    ignore_cache : bool
        Whether to ignore the cache. If ``true`` this will download and overwrite the run xml
        even if the requested run is already cached.

    ignore_cache

    Returns
    -------
    run : OpenMLRun
        Run corresponding to ID, fetched from the server.
    """
    run_dir = Path(openml.utils._create_cache_directory_for_id(RUNS_CACHE_DIR_NAME, run_id))
    run_file = run_dir / "description.xml"

    run_dir.mkdir(parents=True, exist_ok=True)

    try:
        if not ignore_cache:
            return _get_cached_run(run_id)

        raise OpenMLCacheException(message="dummy")

    except OpenMLCacheException:
        run_xml = openml._api_calls._perform_api_call("run/%d" % run_id, "get")
        with run_file.open("w", encoding="utf8") as fh:
            fh.write(run_xml)

    return _create_run_from_xml(run_xml)

get_run_trace #

get_run_trace(run_id: int) -> OpenMLRunTrace

Get the optimization trace object for a given run id.

Parameters#

run_id : int

Returns#

openml.runs.OpenMLTrace

Source code in openml/runs/functions.py
def get_run_trace(run_id: int) -> OpenMLRunTrace:
    """
    Get the optimization trace object for a given run id.

    Parameters
    ----------
    run_id : int

    Returns
    -------
    openml.runs.OpenMLTrace
    """
    trace_xml = openml._api_calls._perform_api_call("run/trace/%d" % run_id, "get")
    return OpenMLRunTrace.trace_from_xml(trace_xml)

get_runs #

get_runs(run_ids: list[int]) -> list[OpenMLRun]

Gets all runs in run_ids list.

Parameters#

run_ids : list of ints

Returns#

runs : list of OpenMLRun List of runs corresponding to IDs, fetched from the server.

Source code in openml/runs/functions.py
def get_runs(run_ids: list[int]) -> list[OpenMLRun]:
    """Gets all runs in run_ids list.

    Parameters
    ----------
    run_ids : list of ints

    Returns
    -------
    runs : list of OpenMLRun
        List of runs corresponding to IDs, fetched from the server.
    """
    runs = []
    for run_id in run_ids:
        runs.append(get_run(run_id))
    return runs

initialize_model_from_run #

initialize_model_from_run(run_id: int, *, strict_version: bool = True) -> Any

Initialized a model based on a run_id (i.e., using the exact same parameter settings)

Parameters#

run_id : int The Openml run_id strict_version: bool (default=True) See flow_to_model strict_version.

Returns#

model

Source code in openml/runs/functions.py
def initialize_model_from_run(run_id: int, *, strict_version: bool = True) -> Any:
    """
    Initialized a model based on a run_id (i.e., using the exact
    same parameter settings)

    Parameters
    ----------
    run_id : int
        The Openml run_id
    strict_version: bool (default=True)
        See `flow_to_model` strict_version.

    Returns
    -------
    model
    """
    run = get_run(run_id)
    # TODO(eddiebergman): I imagine this is None if it's not published,
    # might need to raise an explicit error for that
    assert run.setup_id is not None
    return initialize_model(setup_id=run.setup_id, strict_version=strict_version)

initialize_model_from_trace #

initialize_model_from_trace(run_id: int, repeat: int, fold: int, iteration: int | None = None) -> Any

Initialize a model based on the parameters that were set by an optimization procedure (i.e., using the exact same parameter settings)

Parameters#

run_id : int The Openml run_id. Should contain a trace file, otherwise a OpenMLServerException is raised

int

The repeat nr (column in trace file)

int

The fold nr (column in trace file)

int

The iteration nr (column in trace file). If None, the best (selected) iteration will be searched (slow), according to the selection criteria implemented in OpenMLRunTrace.get_selected_iteration

Returns#

model

Source code in openml/runs/functions.py
def initialize_model_from_trace(
    run_id: int,
    repeat: int,
    fold: int,
    iteration: int | None = None,
) -> Any:
    """
    Initialize a model based on the parameters that were set
    by an optimization procedure (i.e., using the exact same
    parameter settings)

    Parameters
    ----------
    run_id : int
        The Openml run_id. Should contain a trace file,
        otherwise a OpenMLServerException is raised

    repeat : int
        The repeat nr (column in trace file)

    fold : int
        The fold nr (column in trace file)

    iteration : int
        The iteration nr (column in trace file). If None, the
        best (selected) iteration will be searched (slow),
        according to the selection criteria implemented in
        OpenMLRunTrace.get_selected_iteration

    Returns
    -------
    model
    """
    run = get_run(run_id)
    # TODO(eddiebergman): I imagine this is None if it's not published,
    # might need to raise an explicit error for that
    assert run.flow_id is not None

    flow = get_flow(run.flow_id)
    run_trace = get_run_trace(run_id)

    if iteration is None:
        iteration = run_trace.get_selected_iteration(repeat, fold)

    request = (repeat, fold, iteration)
    if request not in run_trace.trace_iterations:
        raise ValueError("Combination repeat, fold, iteration not available")
    current = run_trace.trace_iterations[(repeat, fold, iteration)]

    search_model = initialize_model_from_run(run_id)
    return flow.extension.instantiate_model_from_hpo_class(search_model, current)

list_runs #

list_runs(offset: int | None = None, size: int | None = None, id: list | None = None, task: list[int] | None = None, setup: list | None = None, flow: list | None = None, uploader: list | None = None, tag: str | None = None, study: int | None = None, display_errors: bool = False, task_type: TaskType | int | None = None) -> DataFrame

List all runs matching all of the given filters. (Supports large amount of results)

Parameters#

offset : int, optional the number of runs to skip, starting from the first size : int, optional the maximum number of runs to show

id : list, optional

task : list, optional

setup: list, optional

flow : list, optional

uploader : list, optional

tag : str, optional

study : int, optional

bool, optional (default=None)

Whether to list runs which have an error (for example a missing prediction file).

task_type : str, optional

Returns#

dataframe

Source code in openml/runs/functions.py
def list_runs(  # noqa: PLR0913
    offset: int | None = None,
    size: int | None = None,
    id: list | None = None,  # noqa: A002
    task: list[int] | None = None,
    setup: list | None = None,
    flow: list | None = None,
    uploader: list | None = None,
    tag: str | None = None,
    study: int | None = None,
    display_errors: bool = False,  # noqa: FBT001, FBT002
    task_type: TaskType | int | None = None,
) -> pd.DataFrame:
    """
    List all runs matching all of the given filters.
    (Supports large amount of results)

    Parameters
    ----------
    offset : int, optional
        the number of runs to skip, starting from the first
    size : int, optional
        the maximum number of runs to show

    id : list, optional

    task : list, optional

    setup: list, optional

    flow : list, optional

    uploader : list, optional

    tag : str, optional

    study : int, optional

    display_errors : bool, optional (default=None)
        Whether to list runs which have an error (for example a missing
        prediction file).

    task_type : str, optional

    Returns
    -------
    dataframe
    """
    if id is not None and (not isinstance(id, list)):
        raise TypeError("id must be of type list.")
    if task is not None and (not isinstance(task, list)):
        raise TypeError("task must be of type list.")
    if setup is not None and (not isinstance(setup, list)):
        raise TypeError("setup must be of type list.")
    if flow is not None and (not isinstance(flow, list)):
        raise TypeError("flow must be of type list.")
    if uploader is not None and (not isinstance(uploader, list)):
        raise TypeError("uploader must be of type list.")

    listing_call = partial(
        _list_runs,
        id=id,
        task=task,
        setup=setup,
        flow=flow,
        uploader=uploader,
        tag=tag,
        study=study,
        display_errors=display_errors,
        task_type=task_type,
    )
    batches = openml.utils._list_all(listing_call, offset=offset, limit=size)
    if len(batches) == 0:
        return pd.DataFrame()

    return pd.concat(batches)

run_exists #

run_exists(task_id: int, setup_id: int) -> set[int]

Checks whether a task/setup combination is already present on the server.

Parameters#

task_id : int

setup_id : int

Returns#
Set run ids for runs where flow setup_id was run on task_id. Empty
set if it wasn't run yet.
Source code in openml/runs/functions.py
def run_exists(task_id: int, setup_id: int) -> set[int]:
    """Checks whether a task/setup combination is already present on the
    server.

    Parameters
    ----------
    task_id : int

    setup_id : int

    Returns
    -------
        Set run ids for runs where flow setup_id was run on task_id. Empty
        set if it wasn't run yet.
    """
    if setup_id <= 0:
        # openml setups are in range 1-inf
        return set()

    try:
        result = list_runs(task=[task_id], setup=[setup_id])
        return set() if result.empty else set(result["run_id"])
    except OpenMLServerException as exception:
        # error code implies no results. The run does not exist yet
        if exception.code != ERROR_CODE:
            raise exception
        return set()

run_flow_on_task #

run_flow_on_task(flow: OpenMLFlow, task: OpenMLTask, avoid_duplicate_runs: bool | None = None, flow_tags: list[str] | None = None, seed: int | None = None, add_local_measures: bool = True, upload_flow: bool = False, n_jobs: int | None = None) -> OpenMLRun

Run the model provided by the flow on the dataset defined by task.

Takes the flow and repeat information into account. The Flow may optionally be published.

Parameters#

flow : OpenMLFlow A flow wraps a machine learning model together with relevant information. The model has a function fit(X,Y) and predict(X), all supervised estimators of scikit learn follow this definition of a model. task : OpenMLTask Task to perform. This may be an OpenMLFlow instead if the first argument is an OpenMLTask. avoid_duplicate_runs : bool, optional (default=None) If True, the run will throw an error if the setup/task combination is already present on the server. This feature requires an internet connection. If not set, it will use the default from your openml configuration (False if unset). flow_tags : List[str], optional (default=None) A list of tags that the flow should have at creation. seed: int, optional (default=None) Models that are not seeded will get this seed. add_local_measures : bool, optional (default=True) Determines whether to calculate a set of evaluation measures locally, to later verify server behaviour. upload_flow : bool (default=False) If True, upload the flow to OpenML if it does not exist yet. If False, do not upload the flow to OpenML. n_jobs : int (default=None) The number of processes/threads to distribute the evaluation asynchronously. If None or 1, then the evaluation is treated as synchronous and processed sequentially. If -1, then the job uses as many cores available.

Returns#

run : OpenMLRun Result of the run.

Source code in openml/runs/functions.py
def run_flow_on_task(  # noqa: C901, PLR0912, PLR0915, PLR0913
    flow: OpenMLFlow,
    task: OpenMLTask,
    avoid_duplicate_runs: bool | None = None,
    flow_tags: list[str] | None = None,
    seed: int | None = None,
    add_local_measures: bool = True,  # noqa: FBT001, FBT002
    upload_flow: bool = False,  # noqa: FBT001, FBT002
    n_jobs: int | None = None,
) -> OpenMLRun:
    """Run the model provided by the flow on the dataset defined by task.

    Takes the flow and repeat information into account.
    The Flow may optionally be published.

    Parameters
    ----------
    flow : OpenMLFlow
        A flow wraps a machine learning model together with relevant information.
        The model has a function fit(X,Y) and predict(X),
        all supervised estimators of scikit learn follow this definition of a model.
    task : OpenMLTask
        Task to perform. This may be an OpenMLFlow instead if the first argument is an OpenMLTask.
    avoid_duplicate_runs : bool, optional (default=None)
        If True, the run will throw an error if the setup/task combination is already present on
        the server. This feature requires an internet connection.
        If not set, it will use the default from your openml configuration (False if unset).
    flow_tags : List[str], optional (default=None)
        A list of tags that the flow should have at creation.
    seed: int, optional (default=None)
        Models that are not seeded will get this seed.
    add_local_measures : bool, optional (default=True)
        Determines whether to calculate a set of evaluation measures locally,
        to later verify server behaviour.
    upload_flow : bool (default=False)
        If True, upload the flow to OpenML if it does not exist yet.
        If False, do not upload the flow to OpenML.
    n_jobs : int (default=None)
        The number of processes/threads to distribute the evaluation asynchronously.
        If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially.
        If `-1`, then the job uses as many cores available.

    Returns
    -------
    run : OpenMLRun
        Result of the run.
    """
    if flow_tags is not None and not isinstance(flow_tags, list):
        raise ValueError("flow_tags should be a list")

    if avoid_duplicate_runs is None:
        avoid_duplicate_runs = openml.config.avoid_duplicate_runs

    # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018).
    # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019).
    if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow):
        # We want to allow either order of argument (to avoid confusion).
        warnings.warn(
            "The old argument order (Flow, model) is deprecated and "
            "will not be supported in the future. Please use the "
            "order (model, Flow).",
            DeprecationWarning,
            stacklevel=2,
        )
        task, flow = flow, task

    if task.task_id is None:
        raise ValueError("The task should be published at OpenML")

    if flow.model is None:
        flow.model = flow.extension.flow_to_model(flow)

    flow.model = flow.extension.seed_model(flow.model, seed=seed)

    # We only need to sync with the server right now if we want to upload the flow,
    # or ensure no duplicate runs exist. Otherwise it can be synced at upload time.
    flow_id = None
    if upload_flow or avoid_duplicate_runs:
        flow_id = flow_exists(flow.name, flow.external_version)
        if isinstance(flow.flow_id, int) and flow_id != flow.flow_id:
            if flow_id is not False:
                raise PyOpenMLError(
                    f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'",
                )
            raise PyOpenMLError(
                "Flow does not exist on the server, but 'flow.flow_id' is not None."
            )
        if upload_flow and flow_id is False:
            flow.publish()
            flow_id = flow.flow_id
        elif flow_id:
            flow_from_server = get_flow(flow_id)
            _copy_server_fields(flow_from_server, flow)
            if avoid_duplicate_runs:
                flow_from_server.model = flow.model
                setup_id = setup_exists(flow_from_server)
                ids = run_exists(task.task_id, setup_id)
                if ids:
                    error_message = (
                        "One or more runs of this setup were already performed on the task."
                    )
                    raise OpenMLRunsExistError(ids, error_message)
        else:
            # Flow does not exist on server and we do not want to upload it.
            # No sync with the server happens.
            flow_id = None

    dataset = task.get_dataset()

    run_environment = flow.extension.get_version_information()
    tags = ["openml-python", run_environment[1]]

    if flow.extension.check_if_model_fitted(flow.model):
        warnings.warn(
            "The model is already fitted! This might cause inconsistency in comparison of results.",
            RuntimeWarning,
            stacklevel=2,
        )

    # execute the run
    res = _run_task_get_arffcontent(
        model=flow.model,
        task=task,
        extension=flow.extension,
        add_local_measures=add_local_measures,
        n_jobs=n_jobs,
    )

    data_content, trace, fold_evaluations, sample_evaluations = res
    fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"]
    generated_description = "\n".join(fields)
    run = OpenMLRun(
        task_id=task.task_id,
        flow_id=flow_id,
        dataset_id=dataset.dataset_id,
        model=flow.model,
        flow_name=flow.name,
        tags=tags,
        trace=trace,
        data_content=data_content,
        flow=flow,
        setup_string=flow.extension.create_setup_string(flow.model),
        description_text=generated_description,
    )

    if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None:
        # We only extract the parameter settings if a sync happened with the server.
        # I.e. when the flow was uploaded or we found it in the avoid_duplicate check.
        # Otherwise, we will do this at upload time.
        run.parameter_settings = flow.extension.obtain_parameter_values(flow)

    # now we need to attach the detailed evaluations
    if task.task_type_id == TaskType.LEARNING_CURVE:
        run.sample_evaluations = sample_evaluations
    else:
        run.fold_evaluations = fold_evaluations

    if flow_id:
        message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}"
    else:
        message = f"Executed Task {task.task_id} on local Flow with name {flow.name}."
    config.logger.info(message)

    return run

run_model_on_task #

run_model_on_task(model: Any, task: int | str | OpenMLTask, avoid_duplicate_runs: bool | None = None, flow_tags: list[str] | None = None, seed: int | None = None, add_local_measures: bool = True, upload_flow: bool = False, return_flow: bool = False, n_jobs: int | None = None) -> OpenMLRun | tuple[OpenMLRun, OpenMLFlow]

Run the model on the dataset defined by the task.

Parameters#

model : sklearn model A model which has a function fit(X,Y) and predict(X), all supervised estimators of scikit learn follow this definition of a model. task : OpenMLTask or int or str Task to perform or Task id. This may be a model instead if the first argument is an OpenMLTask. avoid_duplicate_runs : bool, optional (default=None) If True, the run will throw an error if the setup/task combination is already present on the server. This feature requires an internet connection. If not set, it will use the default from your openml configuration (False if unset). flow_tags : List[str], optional (default=None) A list of tags that the flow should have at creation. seed: int, optional (default=None) Models that are not seeded will get this seed. add_local_measures : bool, optional (default=True) Determines whether to calculate a set of evaluation measures locally, to later verify server behaviour. upload_flow : bool (default=False) If True, upload the flow to OpenML if it does not exist yet. If False, do not upload the flow to OpenML. return_flow : bool (default=False) If True, returns the OpenMLFlow generated from the model in addition to the OpenMLRun. n_jobs : int (default=None) The number of processes/threads to distribute the evaluation asynchronously. If None or 1, then the evaluation is treated as synchronous and processed sequentially. If -1, then the job uses as many cores available.

Returns#

run : OpenMLRun Result of the run. flow : OpenMLFlow (optional, only if return_flow is True). Flow generated from the model.

Source code in openml/runs/functions.py
def run_model_on_task(  # noqa: PLR0913
    model: Any,
    task: int | str | OpenMLTask,
    avoid_duplicate_runs: bool | None = None,
    flow_tags: list[str] | None = None,
    seed: int | None = None,
    add_local_measures: bool = True,  # noqa: FBT001, FBT002
    upload_flow: bool = False,  # noqa: FBT001, FBT002
    return_flow: bool = False,  # noqa: FBT001, FBT002
    n_jobs: int | None = None,
) -> OpenMLRun | tuple[OpenMLRun, OpenMLFlow]:
    """Run the model on the dataset defined by the task.

    Parameters
    ----------
    model : sklearn model
        A model which has a function fit(X,Y) and predict(X),
        all supervised estimators of scikit learn follow this definition of a model.
    task : OpenMLTask or int or str
        Task to perform or Task id.
        This may be a model instead if the first argument is an OpenMLTask.
    avoid_duplicate_runs : bool, optional (default=None)
        If True, the run will throw an error if the setup/task combination is already present on
        the server. This feature requires an internet connection.
        If not set, it will use the default from your openml configuration (False if unset).
    flow_tags : List[str], optional (default=None)
        A list of tags that the flow should have at creation.
    seed: int, optional (default=None)
        Models that are not seeded will get this seed.
    add_local_measures : bool, optional (default=True)
        Determines whether to calculate a set of evaluation measures locally,
        to later verify server behaviour.
    upload_flow : bool (default=False)
        If True, upload the flow to OpenML if it does not exist yet.
        If False, do not upload the flow to OpenML.
    return_flow : bool (default=False)
        If True, returns the OpenMLFlow generated from the model in addition to the OpenMLRun.
    n_jobs : int (default=None)
        The number of processes/threads to distribute the evaluation asynchronously.
        If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially.
        If `-1`, then the job uses as many cores available.

    Returns
    -------
    run : OpenMLRun
        Result of the run.
    flow : OpenMLFlow (optional, only if `return_flow` is True).
        Flow generated from the model.
    """
    if avoid_duplicate_runs is None:
        avoid_duplicate_runs = openml.config.avoid_duplicate_runs
    if avoid_duplicate_runs and not config.apikey:
        warnings.warn(
            "avoid_duplicate_runs is set to True, but no API key is set. "
            "Please set your API key in the OpenML configuration file, see"
            "https://openml.github.io/openml-python/main/examples/20_basic/introduction_tutorial"
            ".html#authentication for more information on authentication.",
            RuntimeWarning,
            stacklevel=2,
        )

    # TODO: At some point in the future do not allow for arguments in old order (6-2018).
    # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019).
    # When removing this please also remove the method `is_estimator` from the extension
    # interface as it is only used here (MF, 3-2019)
    if isinstance(model, (int, str, OpenMLTask)):
        warnings.warn(
            "The old argument order (task, model) is deprecated and "
            "will not be supported in the future. Please use the "
            "order (model, task).",
            DeprecationWarning,
            stacklevel=2,
        )
        task, model = model, task

    extension = get_extension_by_model(model, raise_if_no_extension=True)
    if extension is None:
        # This should never happen and is only here to please mypy will be gone soon once the
        # whole function is removed
        raise TypeError(extension)

    flow = extension.model_to_flow(model)

    def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask:
        """Retrieve an OpenMLTask object from either an integer or string ID,
        or directly from an OpenMLTask object.

        Parameters
        ----------
        _task : Union[int, str, OpenMLTask]
            The task ID or the OpenMLTask object.

        Returns
        -------
        OpenMLTask
            The OpenMLTask object.
        """
        if isinstance(_task, (int, str)):
            return get_task(int(_task))  # type: ignore

        return _task

    task = get_task_and_type_conversion(task)

    run = run_flow_on_task(
        task=task,
        flow=flow,
        avoid_duplicate_runs=avoid_duplicate_runs,
        flow_tags=flow_tags,
        seed=seed,
        add_local_measures=add_local_measures,
        upload_flow=upload_flow,
        n_jobs=n_jobs,
    )
    if return_flow:
        return run, flow
    return run