Skip to content

LudwigModel

LudwigModel

ludwig.api.LudwigModel

LudwigModel(config: str | dict, logging_level: int = logging.ERROR, backend: Backend | str | None = None, gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, callbacks: list[Callback] | None = None)

High-level interface to Ludwig's train / predict / evaluate / experiment pipelines.

Example

Train a model::

config = {...}
model = LudwigModel(config)
train_stats, _, _ = model.train(dataset=file_path)
# or with a DataFrame:
train_stats, _, _ = model.train(dataset=dataframe)

Load a previously trained model and predict::

model = LudwigModel.load(model_dir)
predictions, output_dir = model.predict(dataset=file_path)
# or:
predictions, output_dir = model.predict(dataset=dataframe)

Evaluate::

eval_stats, _, _ = model.evaluate(dataset=file_path)

Initialize a LudwigModel.

PARAMETER DESCRIPTION
config

In-memory config dict or path to a YAML config file.

TYPE: str | dict

logging_level

Log level sent to stderr (e.g., logging.INFO).

TYPE: int DEFAULT: ERROR

backend

Backend instance or string name (e.g., "local", "ray") used for preprocessing and training.

TYPE: Backend | str | None DEFAULT: None

gpus

GPUs to use; same syntax as CUDA_VISIBLE_DEVICES.

TYPE: str | int | list[int] | None DEFAULT: None

gpu_memory_limit

Maximum memory fraction [0, 1] allowed per GPU device.

TYPE: float | None DEFAULT: None

allow_parallel_threads

Allow Torch to use multi-threading for performance at the cost of determinism.

TYPE: bool DEFAULT: True

callbacks

List of ludwig.callbacks.Callback objects that provide hooks into the Ludwig pipeline.

TYPE: list[Callback] | None DEFAULT: None

Source code in ludwig/api.py
def __init__(
    self,
    config: str | dict,
    logging_level: int = logging.ERROR,
    backend: Backend | str | None = None,
    gpus: str | int | list[int] | None = None,
    gpu_memory_limit: float | None = None,
    allow_parallel_threads: bool = True,
    callbacks: list[Callback] | None = None,
) -> None:
    """Initialize a LudwigModel.

    Args:
        config: In-memory config dict or path to a YAML config file.
        logging_level: Log level sent to stderr (e.g., logging.INFO).
        backend: Backend instance or string name (e.g., "local", "ray") used for
            preprocessing and training.
        gpus: GPUs to use; same syntax as CUDA_VISIBLE_DEVICES.
        gpu_memory_limit: Maximum memory fraction [0, 1] allowed per GPU device.
        allow_parallel_threads: Allow Torch to use multi-threading for performance
            at the cost of determinism.
        callbacks: List of `ludwig.callbacks.Callback` objects that provide hooks
            into the Ludwig pipeline.
    """
    # check if config is a path or a dict
    if isinstance(config, str):  # assume path
        config_dict = load_yaml(config)
        self.config_fp = config
    else:
        config_dict = copy.deepcopy(config)
        self.config_fp = None

    self._user_config = upgrade_config_dict_to_latest_version(config_dict)

    # Initialize the config object
    self.config_obj = ModelConfig.from_dict(self._user_config)

    # setup logging
    self.set_logging_level(logging_level)

    # setup Backend
    self.backend = initialize_backend(backend or self._user_config.get("backend"))
    logger.info(f"Using backend: {self.backend.BACKEND_TYPE}")
    self.callbacks = callbacks if callbacks is not None else []

    # setup PyTorch env (GPU allocation, etc.)
    self.backend.initialize_pytorch(
        gpus=gpus, gpu_memory_limit=gpu_memory_limit, allow_parallel_threads=allow_parallel_threads
    )

    # setup model
    self.model = None
    self.training_set_metadata: dict[str, dict] | None = None

    # online training state
    self._online_trainer = None

    # Zero-shot LLM usage.
    if (
        self.config_obj.model_type == MODEL_LLM
        and self.config_obj.trainer.type == "none"
        # Category output features require a vocabulary. The LLM LudwigModel should be initialized with
        # model.train(dataset).
        and self.config_obj.output_features[0].type == "text"
    ):
        self._initialize_llm_for_zero_shot()

config property writable

config: ModelConfigDict

Returns the fully-rendered config of this model including default values.

train

train(dataset: str | dict | DataFrame | None = None, training_set: str | dict | DataFrame | Dataset | None = None, validation_set: str | dict | DataFrame | Dataset | None = None, test_set: str | dict | DataFrame | Dataset | None = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, experiment_name: str = 'api_experiment', model_name: str = 'run', model_resume_path: str | None = None, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = False, output_directory: str | None = 'results', random_seed: int = default_random_seed, callbacks: list[Callback] | None = None, **kwargs: Any) -> TrainingResults

Train the model on the provided dataset.

Results are saved to [output_directory]/[experiment_name]_[model_name]_n, where n increments to differentiate repeated runs.

PARAMETER DESCRIPTION
dataset

Source containing the full dataset. If it has a split column (0=train, 1=validation, 2=test) it is used for splitting; otherwise the dataset is split randomly. Mutually exclusive with training_set.

TYPE: str | dict | DataFrame | None DEFAULT: None

training_set

Source containing training data only.

TYPE: str | dict | DataFrame | Dataset | None DEFAULT: None

validation_set

Source containing validation data only.

TYPE: str | dict | DataFrame | Dataset | None DEFAULT: None

test_set

Source containing test data only.

TYPE: str | dict | DataFrame | Dataset | None DEFAULT: None

training_set_metadata

Pre-computed metadata dict or path to a .meta.json file produced by a previous Ludwig run on the same dataset.

TYPE: str | dict | None DEFAULT: None

data_format

Format hint for data sources. Inferred automatically when None. Valid values: 'auto', 'csv', 'df', 'dict', 'excel', 'feather', 'fwf', 'hdf5', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

experiment_name

Name used when creating the output directory.

TYPE: str DEFAULT: 'api_experiment'

model_name

Name used when creating the output directory.

TYPE: str DEFAULT: 'run'

model_resume_path

Resume training from this checkpoint directory. Config, optimizer state, and training statistics are all restored.

TYPE: str | None DEFAULT: None

skip_save_training_description

Skip saving the experiment description JSON.

TYPE: bool DEFAULT: False

skip_save_training_statistics

Skip saving training statistics JSON.

TYPE: bool DEFAULT: False

skip_save_model

Skip saving model weights after each improvement. The returned model will have end-of-training weights rather than best-validation weights, and the model cannot be reloaded later.

TYPE: bool DEFAULT: False

skip_save_progress

Skip saving per-epoch checkpoints used for resuming.

TYPE: bool DEFAULT: False

skip_save_log

Skip saving TensorBoard logs.

TYPE: bool DEFAULT: False

skip_save_processed_input

Skip caching the preprocessed HDF5/JSON files.

TYPE: bool DEFAULT: False

output_directory

Root directory for all saved outputs.

TYPE: str | None DEFAULT: 'results'

random_seed

Seed for data splitting, weight initialization, and shuffling.

TYPE: int DEFAULT: default_random_seed

callbacks

Additional callbacks for this specific train() call. These are merged with any callbacks already attached to the model via LudwigModel(callbacks=[...]). Useful for per-run instrumentation (e.g., attaching a WandB logger to one run without rebuilding the model). Callbacks added here do not persist after this call returns.

TYPE: list[Callback] | None DEFAULT: None

**kwargs

Additional keyword arguments forwarded to preprocessing.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
TrainingResults

A TrainingResults namedtuple with fields:

TrainingResults
  • training_set_metadata: feature-level preprocessing metadata.
TrainingResults
  • preprocessed_data: (training_set, validation_set, test_set) datasets.
TrainingResults
  • output_directory: path where all outputs were saved.
Source code in ludwig/api.py
def train(
    self,
    dataset: str | dict | pd.DataFrame | None = None,
    training_set: str | dict | pd.DataFrame | Dataset | None = None,
    validation_set: str | dict | pd.DataFrame | Dataset | None = None,
    test_set: str | dict | pd.DataFrame | Dataset | None = None,
    training_set_metadata: str | dict | None = None,
    data_format: str | None = None,
    experiment_name: str = "api_experiment",
    model_name: str = "run",
    model_resume_path: str | None = None,
    skip_save_training_description: bool = False,
    skip_save_training_statistics: bool = False,
    skip_save_model: bool = False,
    skip_save_progress: bool = False,
    skip_save_log: bool = False,
    skip_save_processed_input: bool = False,
    output_directory: str | None = "results",
    random_seed: int = default_random_seed,
    callbacks: list[Callback] | None = None,
    **kwargs: Any,
) -> TrainingResults:
    """Train the model on the provided dataset.

    Results are saved to `[output_directory]/[experiment_name]_[model_name]_n`,
    where `n` increments to differentiate repeated runs.

    Args:
        dataset: Source containing the full dataset. If it has a split column (0=train,
            1=validation, 2=test) it is used for splitting; otherwise the dataset is
            split randomly. Mutually exclusive with `training_set`.
        training_set: Source containing training data only.
        validation_set: Source containing validation data only.
        test_set: Source containing test data only.
        training_set_metadata: Pre-computed metadata dict or path to a `.meta.json`
            file produced by a previous Ludwig run on the same dataset.
        data_format: Format hint for data sources. Inferred automatically when
            `None`. Valid values: `'auto'`, `'csv'`, `'df'`, `'dict'`,
            `'excel'`, `'feather'`, `'fwf'`, `'hdf5'`, `'html'`, `'json'`,
            `'jsonl'`, `'parquet'`, `'pickle'`, `'sas'`, `'spss'`, `'stata'`,
            `'tsv'`.
        experiment_name: Name used when creating the output directory.
        model_name: Name used when creating the output directory.
        model_resume_path: Resume training from this checkpoint directory.
            Config, optimizer state, and training statistics are all restored.
        skip_save_training_description: Skip saving the experiment description JSON.
        skip_save_training_statistics: Skip saving training statistics JSON.
        skip_save_model: Skip saving model weights after each improvement.
            The returned model will have end-of-training weights rather than
            best-validation weights, and the model cannot be reloaded later.
        skip_save_progress: Skip saving per-epoch checkpoints used for resuming.
        skip_save_log: Skip saving TensorBoard logs.
        skip_save_processed_input: Skip caching the preprocessed HDF5/JSON files.
        output_directory: Root directory for all saved outputs.
        random_seed: Seed for data splitting, weight initialization, and shuffling.
        callbacks: Additional callbacks for this specific ``train()`` call. These are
            merged with any callbacks already attached to the model via
            ``LudwigModel(callbacks=[...])``. Useful for per-run instrumentation
            (e.g., attaching a WandB logger to one run without rebuilding the model).
            Callbacks added here do not persist after this call returns.
        **kwargs: Additional keyword arguments forwarded to preprocessing.

    Returns:
        A `TrainingResults` namedtuple with fields:
        - `training_set_metadata`: feature-level preprocessing metadata.
        - `preprocessed_data`: `(training_set, validation_set, test_set)` datasets.
        - `output_directory`: path where all outputs were saved.
    """
    # Only reset the metadata if the model has not been trained before
    if self.training_set_metadata:
        logger.warning(
            "This model has been trained before. Its architecture has been defined by the original training set "
            "(for example, the number of possible categorical outputs). The current training data will be mapped "
            "to this architecture. If you want to change the architecture of the model, please concatenate your "
            "new training data with the original and train a new model from scratch."
        )
        training_set_metadata = self.training_set_metadata

    if self._user_config.get(HYPEROPT):
        print_boxed("WARNING")
        logger.warning(HYPEROPT_WARNING)

    # setup directories and file names
    if model_resume_path is not None:
        if path_exists(model_resume_path):
            output_directory = model_resume_path
            if self.backend.is_coordinator():
                logger.info(f"Model resume path '{model_resume_path}' exists, trying to resume training.")
        else:
            if self.backend.is_coordinator():
                logger.info(
                    f"Model resume path '{model_resume_path}' does not exist, starting training from scratch"
                )
            model_resume_path = None

    if model_resume_path is None:
        if self.backend.is_coordinator():
            output_directory = get_output_directory(output_directory, experiment_name, model_name)
        else:
            output_directory = None

    # if we are skipping all saving,
    # there is no need to create a directory that will remain empty
    should_create_output_directory = not (
        skip_save_training_description
        and skip_save_training_statistics
        and skip_save_model
        and skip_save_progress
        and skip_save_log
        and skip_save_processed_input
    )

    output_url = output_directory
    with upload_output_directory(output_directory) as (output_directory, upload_fn):
        # Merge per-call callbacks with the model's own callbacks for the duration of this train() call.
        train_callbacks = self.callbacks + (callbacks or [])
        if upload_fn is not None:
            # Upload output files (checkpoints, etc.) to remote storage at the end of
            # each epoch and evaluation, in case of failure in the middle of training.
            class UploadOnEpochEndCallback(Callback):
                def on_eval_end(self, trainer, progress_tracker, save_path):
                    upload_fn()

                def on_epoch_end(self, trainer, progress_tracker, save_path):
                    upload_fn()

            train_callbacks = train_callbacks + [UploadOnEpochEndCallback()]

        description_fn = training_stats_fn = model_dir = None
        if self.backend.is_coordinator():
            if should_create_output_directory:
                makedirs(output_directory, exist_ok=True)
            description_fn, training_stats_fn, model_dir = get_file_names(output_directory)

        if isinstance(training_set, Dataset) and training_set_metadata is not None:
            preprocessed_data = PreprocessedDataset(training_set, validation_set, test_set, training_set_metadata)
        else:
            # save description
            if self.backend.is_coordinator():
                description = get_experiment_description(
                    self.config_obj.to_dict(),
                    dataset=dataset,
                    training_set=training_set,
                    validation_set=validation_set,
                    test_set=test_set,
                    training_set_metadata=training_set_metadata,
                    data_format=data_format,
                    backend=self.backend,
                    random_seed=random_seed,
                )

                if not skip_save_training_description:
                    save_json(description_fn, description)

                # print description
                experiment_description = [
                    ["Experiment name", experiment_name],
                    ["Model name", model_name],
                    ["Output directory", output_directory],
                ]
                for key, value in description.items():
                    if key != "config":  # Config is printed separately.
                        experiment_description.append([key, pformat(value, indent=4)])

                if self.backend.is_coordinator():
                    print_boxed("EXPERIMENT DESCRIPTION")
                    logger.info(tabulate(experiment_description, tablefmt="fancy_grid"))

                    print_boxed("LUDWIG CONFIG")
                    logger.info("User-specified config (with upgrades):\n")
                    logger.info(pformat(self._user_config, indent=4))
                    logger.info(
                        "\nFull config saved to:\n"
                        f"{output_directory}/{experiment_name}/model/model_hyperparameters.json"
                    )

            preprocessed_data = self.preprocess(
                dataset=dataset,
                training_set=training_set,
                validation_set=validation_set,
                test_set=test_set,
                training_set_metadata=training_set_metadata,
                data_format=data_format,
                experiment_name=experiment_name,
                model_name=model_name,
                model_resume_path=model_resume_path,
                skip_save_training_description=skip_save_training_description,
                skip_save_training_statistics=skip_save_training_statistics,
                skip_save_model=skip_save_model,
                skip_save_progress=skip_save_progress,
                skip_save_log=skip_save_log,
                skip_save_processed_input=skip_save_processed_input,
                output_directory=output_directory,
                random_seed=random_seed,
                **kwargs,
            )
            training_set = preprocessed_data.training_set
            validation_set = preprocessed_data.validation_set
            test_set = preprocessed_data.test_set
            training_set_metadata = preprocessed_data.training_set_metadata

        self.training_set_metadata = training_set_metadata

        if self.backend.is_coordinator():
            dataset_statistics = generate_dataset_statistics(training_set, validation_set, test_set)

            if not skip_save_model:
                # save train set metadata
                assert model_dir is not None
                os.makedirs(model_dir, exist_ok=True)
                save_json(os.path.join(model_dir, TRAIN_SET_METADATA_FILE_NAME), training_set_metadata)

            logger.info("\nDataset Statistics")
            logger.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid"))

        for callback in train_callbacks:
            callback.on_train_init(
                base_config=self._user_config,
                experiment_directory=output_directory,
                experiment_name=experiment_name,
                model_name=model_name,
                output_directory=output_directory,
                resume_directory=model_resume_path,
            )

        # Build model if not provided
        # if it was provided it means it was already loaded
        if not self.model:
            if self.backend.is_coordinator():
                print_boxed("MODEL")
            # update model config with metadata properties derived from training set
            update_config_with_metadata(self.config_obj, training_set_metadata)
            logger.info("Warnings and other logs:")
            self.model = LudwigModel.create_model(self.config_obj, random_seed=random_seed)
            # update config with properties determined during model instantiation
            update_config_with_model(self.config_obj, self.model)
            set_saved_weights_in_checkpoint_flag(self.config_obj)

        # auto tune learning rate
        if hasattr(self.config_obj.trainer, "learning_rate") and self.config_obj.trainer.learning_rate == AUTO:
            detected_learning_rate = get_auto_learning_rate(self.config_obj)
            self.config_obj.trainer.learning_rate = detected_learning_rate

        with self.backend.create_trainer(
            model=self.model,
            config=self.config_obj.trainer,
            resume=model_resume_path is not None,
            skip_save_model=skip_save_model,
            skip_save_progress=skip_save_progress,
            skip_save_log=skip_save_log,
            callbacks=train_callbacks,
            random_seed=random_seed,
        ) as trainer:
            # auto tune batch size
            self._tune_batch_size_and_grad_accum(trainer, training_set, random_seed=random_seed)

            if (
                self.config_obj.model_type == MODEL_LLM
                and trainer.config.type == "none"
                and self.config_obj.adapter is not None
                and self.config_obj.adapter.pretrained_adapter_weights is not None
            ):
                trainer.model.initialize_adapter()  # Load pre-trained adapter weights for inference only

            # train model
            if self.backend.is_coordinator():
                print_boxed("TRAINING")
                if not skip_save_model:
                    self.save_config(model_dir)

            for callback in train_callbacks:
                callback.on_train_start(
                    model=self.model,
                    config=self.config_obj.to_dict(),
                    config_fp=self.config_fp,
                )

            try:
                train_stats = trainer.train(
                    training_set,
                    validation_set=validation_set,
                    test_set=test_set,
                    save_path=model_dir,
                )
                self.model, train_trainset_stats, train_valiset_stats, train_testset_stats = train_stats

                # Calibrate output probabilities and save model (coordinator-only).
                # Must run after training completes, before final model parameters are saved.
                if self.backend.is_coordinator():
                    calibrator = Calibrator(
                        self.model,
                        self.backend,
                        batch_size=trainer.eval_batch_size,
                    )
                    self._run_calibration(calibrator, validation_set, training_set, skip_save_model, model_dir)

                # Evaluation Frequency
                if self.config_obj.model_type == MODEL_ECD and self.config_obj.trainer.steps_per_checkpoint:
                    evaluation_frequency = EvaluationFrequency(
                        self.config_obj.trainer.steps_per_checkpoint, EvaluationFrequency.STEP
                    )
                elif self.config_obj.model_type == MODEL_ECD and self.config_obj.trainer.checkpoints_per_epoch:
                    evaluation_frequency = EvaluationFrequency(
                        1.0 / self.config_obj.trainer.checkpoints_per_epoch, EvaluationFrequency.EPOCH
                    )
                else:
                    evaluation_frequency = EvaluationFrequency(1, EvaluationFrequency.EPOCH)

                # Unpack train()'s return.
                # The statistics are all nested dictionaries of TrainerMetrics: feature_name -> metric_name ->
                # List[TrainerMetric], with one entry per training checkpoint, according to steps_per_checkpoint.
                # We reduce the dictionary of TrainerMetrics to a simple list of floats for interfacing with Ray
                # Tune.
                train_stats = TrainingStats(
                    metric_utils.reduce_trainer_metrics_dict(train_trainset_stats),
                    metric_utils.reduce_trainer_metrics_dict(train_valiset_stats),
                    metric_utils.reduce_trainer_metrics_dict(train_testset_stats),
                    evaluation_frequency,
                )

                # save training statistics
                if self.backend.is_coordinator():
                    if not skip_save_training_statistics:
                        save_json(training_stats_fn, train_stats)

                # results of the model with highest validation test performance
                if (
                    self.backend.is_coordinator()
                    and validation_set is not None
                    and not self.config_obj.trainer.skip_all_evaluation
                ):
                    print_boxed("TRAINING REPORT")
                    training_report = get_training_report(
                        trainer.validation_field,
                        trainer.validation_metric,
                        test_set is not None,
                        train_valiset_stats,
                        train_testset_stats,
                    )
                    logger.info(tabulate(training_report, tablefmt="fancy_grid"))
                    logger.info(f"\nFinished: {experiment_name}_{model_name}")
                    logger.info(f"Saved to: {output_directory}")
            finally:
                for callback in train_callbacks:
                    callback.on_train_end(output_directory)

            self.training_set_metadata = training_set_metadata

            if self.is_merge_and_unload_set():
                # For an LLM model trained with a LoRA adapter, merge first, then save the full model.
                self.model.merge_and_unload(progressbar=self.config_obj.adapter.postprocessor.progressbar)

                if self.backend.is_coordinator() and not skip_save_model:
                    self.model.save_base_model(model_dir)
            elif self.backend.is_coordinator() and not skip_save_model:
                self.model.save(model_dir)

            # Save model card alongside the model (always)
            if self.backend.is_coordinator() and not skip_save_model:
                try:
                    from ludwig.utils.model_card import save_model_card

                    save_model_card(
                        output_directory=output_directory,
                        config=self.config_obj.to_dict(),
                        training_set_metadata=training_set_metadata,
                        train_stats=train_stats,
                        model_dir=model_dir,
                    )
                except Exception:
                    logger.warning("Failed to generate model card.", exc_info=True)

            # Save training report (always, alongside the model)
            if self.backend.is_coordinator() and not skip_save_model:
                try:
                    from ludwig.utils.training_report import save_training_report

                    save_training_report(
                        output_directory=output_directory,
                        config=self.config_obj.to_dict(),
                        training_set_metadata=training_set_metadata,
                        train_stats=train_stats,
                        model_dir=model_dir,
                        random_seed=random_seed,
                    )
                except Exception:
                    logger.warning("Failed to generate training report.", exc_info=True)

            # Synchronize model weights between workers
            self.backend.sync_model(self.model)

            print_boxed("FINISHED")
            return TrainingResults(train_stats, preprocessed_data, output_url)

train_online

train_online(dataset: str | dict | DataFrame, training_set_metadata: str | dict | None = None, data_format: str = 'auto', random_seed: int = default_random_seed) -> None

Train the model for one epoch on dataset (online / incremental learning).

PARAMETER DESCRIPTION
dataset

Source containing the training data for this epoch.

TYPE: str | dict | DataFrame

training_set_metadata

Pre-computed metadata from a prior run. When None, metadata is derived from the provided dataset.

TYPE: str | dict | None DEFAULT: None

data_format

Format hint for the data source. Inferred when 'auto'.

TYPE: str DEFAULT: 'auto'

random_seed

Seed for data splitting and parameter initialization.

TYPE: int DEFAULT: default_random_seed

Source code in ludwig/api.py
def train_online(
    self,
    dataset: str | dict | pd.DataFrame,
    training_set_metadata: str | dict | None = None,
    data_format: str = "auto",
    random_seed: int = default_random_seed,
) -> None:
    """Train the model for one epoch on `dataset` (online / incremental learning).

    Args:
        dataset: Source containing the training data for this epoch.
        training_set_metadata: Pre-computed metadata from a prior run. When
            `None`, metadata is derived from the provided dataset.
        data_format: Format hint for the data source. Inferred when `'auto'`.
        random_seed: Seed for data splitting and parameter initialization.
    """
    training_set_metadata = training_set_metadata or self.training_set_metadata
    preprocessing_params = get_preprocessing_params(self.config_obj)

    with provision_preprocessing_workers(self.backend):
        training_dataset, _, _, training_set_metadata = preprocess_for_training(
            self.config_obj,
            training_set=dataset,
            training_set_metadata=training_set_metadata,
            data_format=data_format,
            skip_save_processed_input=True,
            preprocessing_params=preprocessing_params,
            backend=self.backend,
            random_seed=random_seed,
            callbacks=self.callbacks,
        )

    if not self.training_set_metadata:
        self.training_set_metadata = training_set_metadata

    if not self.model:
        update_config_with_metadata(self.config_obj, training_set_metadata)
        self.model = LudwigModel.create_model(self.config_obj, random_seed=random_seed)
        # update config with properties determined during model instantiation
        update_config_with_model(self.config_obj, self.model)
        set_saved_weights_in_checkpoint_flag(self.config_obj)

    if not self._online_trainer:
        self._online_trainer = self.backend.create_trainer(
            config=self.config_obj.trainer, model=self.model, random_seed=random_seed
        )

        self._tune_batch_size_and_grad_accum(self._online_trainer, dataset, random_seed=random_seed)

    self.model = self._online_trainer.train_online(training_dataset)

save_dequantized_base_model

save_dequantized_base_model(save_path: str) -> None

Upscales quantized weights of a model to fp16 and saves the result in a specified folder.

PARAMETER DESCRIPTION
save_path

The path to the folder where the upscaled model weights will be saved.

TYPE: str

RAISES DESCRIPTION
ValueError

If the model type is not 'llm' or if quantization is not enabled or the number of bits is not 4 or 8.

RuntimeError

If no GPU is available, as GPU is required for quantized models.

RETURNS DESCRIPTION
None

None

Source code in ludwig/api.py
def save_dequantized_base_model(self, save_path: str) -> None:
    """Upscales quantized weights of a model to fp16 and saves the result in a specified folder.

    Args:
        save_path (str): The path to the folder where the upscaled model weights will be saved.

    Raises:
        ValueError:
            If the model type is not 'llm' or if quantization is not enabled or the number of bits is not 4 or 8.
        RuntimeError:
            If no GPU is available, as GPU is required for quantized models.

    Returns:
        None
    """
    if self.config_obj.model_type != MODEL_LLM:
        raise ValueError(
            f"Model type {self.config_obj.model_type} is not supported by this method. Only `llm` model type is "
            "supported."
        )

    if not self.config_obj.quantization:
        raise ValueError(
            "Quantization is not enabled in your Ludwig model config. "
            "To enable quantization, set `quantization` to `{'bits': 4}` or `{'bits': 8}` in your model config."
        )

    if self.config_obj.quantization.bits != 4:
        raise ValueError(
            "This method only works with quantized models with 4 bits. "
            "Support for 8-bit quantized models will be added in a future release."
        )

    if not torch.cuda.is_available():
        raise RuntimeError("GPU is required for quantized models but no GPU found.")

    # Create the LLM model class instance with the loaded LLM if it hasn't been initialized yet.
    if not self.model:
        self.model = LudwigModel.create_model(self.config_obj)

    self.model.save_dequantized_base_model(save_path)

    logger.info(
        "If you want to upload this model to huggingface.co, run the following Python commands: \n"
        "from ludwig.utils.hf_utils import upload_folder_to_hfhub; \n"
        f"upload_folder_to_hfhub(repo_id='desired/huggingface/repo/name', folder_path='{save_path}')"
    )

generate

generate(input_strings: str | list[str], generation_config: dict | None = None, streaming: bool | None = False, callbacks: list[Callback] | None = None) -> str | list[str]

A simple generate() method that directly uses the underlying transformers library to generate text.

PARAMETER DESCRIPTION
input_strings

Input text or list of texts to generate from.

TYPE: str | list[str]

generation_config

Configuration for text generation.

TYPE: dict | None DEFAULT: None

streaming

If True, enable streaming output.

TYPE: bool | None DEFAULT: False

callbacks

Optional callbacks for this generate call.

TYPE: list[Callback] | None DEFAULT: None

RETURNS DESCRIPTION
str | list[str]

Union[str, List[str]]: Generated text or list of generated texts.

Source code in ludwig/api.py
def generate(
    self,
    input_strings: str | list[str],
    generation_config: dict | None = None,
    streaming: bool | None = False,
    callbacks: list[Callback] | None = None,
) -> str | list[str]:
    """A simple generate() method that directly uses the underlying transformers library to generate text.

    Args:
        input_strings: Input text or list of texts to generate from.
        generation_config: Configuration for text generation.
        streaming: If True, enable streaming output.
        callbacks: Optional callbacks for this generate call.

    Returns:
        Union[str, List[str]]: Generated text or list of generated texts.
    """
    if self.config_obj.model_type != MODEL_LLM:
        raise ValueError(
            f"Model type {self.config_obj.model_type} is not supported by this method. Only `llm` model type is "
            "supported."
        )
    if not torch.cuda.is_available():
        # GPU is required for loading quantized models. See https://github.com/ludwig-ai/ludwig/issues/3695.
        raise ValueError(
            "A CUDA GPU is required for generate() with quantized LLMs, but none was detected.\n"
            "Either run on a GPU machine or disable quantization in your model config."
        )

    # Decoder-only models require left-padding for correct generation results (right-padding causes HF warnings).
    padding_side = "left" if not getattr(self.model.model.config, "is_encoder_decoder", False) else "right"
    tokenizer = HFTokenizer(self.config_obj.base_model, padding_side=padding_side)

    with self.model.use_generation_config(generation_config):
        start_time = time.time()
        tokenized_inputs = tokenizer.tokenizer(input_strings, return_tensors="pt", padding=True)
        input_ids = tokenized_inputs["input_ids"].to("cuda")
        attention_mask = tokenized_inputs["attention_mask"].to("cuda")

        if streaming:
            streamer = create_text_streamer(tokenizer.tokenizer)
            outputs = self._generate_streaming_outputs(input_strings, input_ids, attention_mask, streamer)
        else:
            outputs = self._generate_non_streaming_outputs(input_strings, input_ids, attention_mask)

        decoded_outputs = tokenizer.tokenizer.batch_decode(outputs, skip_special_tokens=True)
        logger.info(f"Finished generating in: {(time.time() - start_time):.2f}s.")

        return decoded_outputs[0] if len(decoded_outputs) == 1 else decoded_outputs

predict

predict(dataset: str | dict | DataFrame | None = None, data_format: str | None = None, split: str = FULL, batch_size: int = 128, generation_config: dict | None = None, skip_save_unprocessed_output: bool = True, skip_save_predictions: bool = True, output_directory: str = 'results', return_type: type = pd.DataFrame, callbacks: list[Callback] | None = None, **kwargs: Any) -> tuple[dict | pd.DataFrame, str]

Make predictions from a trained model on the provided dataset.

PARAMETER DESCRIPTION
dataset

Source containing the dataset to predict on.

TYPE: str | dict | DataFrame | None DEFAULT: None

data_format

Format hint for the data source. Inferred automatically when None. Valid values: 'auto', 'csv', 'df', 'dict', 'excel', 'feather', 'fwf', 'hdf5', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

split

Which split of the data to use when the dataset contains a split column. One of 'full', 'training', 'validation', 'test'.

TYPE: str DEFAULT: FULL

batch_size

Number of rows per prediction batch.

TYPE: int DEFAULT: 128

generation_config

LLM-only generation parameters. When None, the config used at training time is applied. Ignored for non-LLM models.

TYPE: dict | None DEFAULT: None

skip_save_unprocessed_output

When False, raw numpy tensors are saved alongside the postprocessed CSV files. When True (default), only CSVs are written.

TYPE: bool DEFAULT: True

skip_save_predictions

Skip writing prediction CSV files.

TYPE: bool DEFAULT: True

output_directory

Root directory for saved prediction outputs.

TYPE: str DEFAULT: 'results'

return_type

Format of the returned predictions (pd.DataFrame or dict).

TYPE: type DEFAULT: DataFrame

callbacks

Extra callbacks for this predict call; combined with any callbacks already registered to the model.

TYPE: list[Callback] | None DEFAULT: None

**kwargs

Forwarded to the underlying predictor.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict | DataFrame

A tuple (predictions, output_directory) where predictions is a

str

pd.DataFrame (or dict) of model outputs and output_directory is

tuple[dict | DataFrame, str]

the path where results were saved.

Source code in ludwig/api.py
def predict(
    self,
    dataset: str | dict | pd.DataFrame | None = None,
    data_format: str | None = None,
    split: str = FULL,
    batch_size: int = 128,
    generation_config: dict | None = None,
    skip_save_unprocessed_output: bool = True,
    skip_save_predictions: bool = True,
    output_directory: str = "results",
    return_type: type = pd.DataFrame,
    callbacks: list[Callback] | None = None,
    **kwargs: Any,
) -> tuple[dict | pd.DataFrame, str]:
    """Make predictions from a trained model on the provided dataset.

    Args:
        dataset: Source containing the dataset to predict on.
        data_format: Format hint for the data source. Inferred automatically when `None`.
            Valid values: `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
            `'fwf'`, `'hdf5'`, `'html'`, `'json'`, `'jsonl'`, `'parquet'`, `'pickle'`,
            `'sas'`, `'spss'`, `'stata'`, `'tsv'`.
        split: Which split of the data to use when the dataset contains a split column.
            One of `'full'`, `'training'`, `'validation'`, `'test'`.
        batch_size: Number of rows per prediction batch.
        generation_config: LLM-only generation parameters. When `None`, the config used
            at training time is applied. Ignored for non-LLM models.
        skip_save_unprocessed_output: When `False`, raw numpy tensors are saved alongside
            the postprocessed CSV files. When `True` (default), only CSVs are written.
        skip_save_predictions: Skip writing prediction CSV files.
        output_directory: Root directory for saved prediction outputs.
        return_type: Format of the returned predictions (`pd.DataFrame` or `dict`).
        callbacks: Extra callbacks for this predict call; combined with any callbacks
            already registered to the model.
        **kwargs: Forwarded to the underlying predictor.

    Returns:
        A tuple `(predictions, output_directory)` where `predictions` is a
        `pd.DataFrame` (or `dict`) of model outputs and `output_directory` is
        the path where results were saved.
    """
    self._check_initialization()

    # preprocessing
    start_time = time.time()
    logger.debug(f"Preprocessing dataset for prediction (batch_size={batch_size})")
    dataset, _ = self._preprocess_for_prediction(
        dataset,
        data_format=data_format,
        split=split,
        include_outputs=False,
        callbacks=callbacks,
    )

    logger.debug(f"Running batch prediction (batch_size={batch_size})")
    with self.backend.create_predictor(self.model, batch_size=batch_size) as predictor:
        with self.model.use_generation_config(generation_config):
            predictions = predictor.batch_predict(
                dataset,
            )

        if self.backend.is_coordinator():
            # if we are skipping all saving,
            # there is no need to create a directory that will remain empty
            should_create_exp_dir = not (skip_save_unprocessed_output and skip_save_predictions)
            if should_create_exp_dir:
                makedirs(output_directory, exist_ok=True)

        logger.debug("Postprocessing predictions")
        postproc_predictions = postprocess(
            predictions,
            self.model.output_features,
            self.training_set_metadata,
            output_directory=output_directory,
            backend=self.backend,
            skip_save_unprocessed_output=skip_save_unprocessed_output or not self.backend.is_coordinator(),
        )
        converted_postproc_predictions = convert_predictions(
            postproc_predictions, self.model.output_features, return_type=return_type, backend=self.backend
        )
        if self.backend.is_coordinator():
            if not skip_save_predictions:
                save_prediction_outputs(
                    postproc_predictions, self.model.output_features, output_directory, self.backend
                )

                logger.info(f"Saved to: {output_directory}")

        logger.info(f"Finished predicting in: {(time.time() - start_time):.2f}s.")
        return converted_postproc_predictions, output_directory

evaluate

evaluate(dataset: str | dict | DataFrame | None = None, data_format: str | None = None, split: str = FULL, batch_size: int | None = None, skip_save_unprocessed_output: bool = True, skip_save_predictions: bool = True, skip_save_eval_stats: bool = True, collect_predictions: bool = False, collect_overall_stats: bool = False, output_directory: str = 'results', return_type: type = pd.DataFrame, **kwargs: Any) -> tuple[dict, dict | pd.DataFrame, str]

Evaluate a trained model and compute performance statistics.

PARAMETER DESCRIPTION
dataset

Source containing the dataset to evaluate.

TYPE: str | dict | DataFrame | None DEFAULT: None

data_format

Format hint for the data source. Inferred automatically when None. Valid values: 'auto', 'csv', 'df', 'dict', 'excel', 'feather', 'fwf', 'hdf5', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

split

Which split of the data to use when the dataset contains a split column. One of 'full', 'training', 'validation', 'test'.

TYPE: str DEFAULT: FULL

batch_size

Number of rows per evaluation batch. Defaults to eval_batch_size from the trainer config.

TYPE: int | None DEFAULT: None

skip_save_unprocessed_output

When False, raw numpy tensors are saved alongside postprocessed CSV files. When True (default), only CSVs are written.

TYPE: bool DEFAULT: True

skip_save_predictions

Skip writing prediction CSV files.

TYPE: bool DEFAULT: True

skip_save_eval_stats

Skip writing evaluation statistics JSON.

TYPE: bool DEFAULT: True

collect_predictions

Collect and return postprocessed predictions.

TYPE: bool DEFAULT: False

collect_overall_stats

Compute and include dataset-level aggregate metrics.

TYPE: bool DEFAULT: False

output_directory

Root directory for saved evaluation outputs.

TYPE: str DEFAULT: 'results'

return_type

Format for returned predictions (pd.DataFrame or dict).

TYPE: type DEFAULT: DataFrame

**kwargs

Forwarded to preprocessing.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict

A tuple (eval_stats, predictions, output_directory) where eval_stats is a

dict | DataFrame

nested dict of feature β†’ metric β†’ value, predictions is a pd.DataFrame or

str

dict of model outputs, and output_directory is the path where results were

tuple[dict, dict | DataFrame, str]

saved.

Source code in ludwig/api.py
def evaluate(
    self,
    dataset: str | dict | pd.DataFrame | None = None,
    data_format: str | None = None,
    split: str = FULL,
    batch_size: int | None = None,
    skip_save_unprocessed_output: bool = True,
    skip_save_predictions: bool = True,
    skip_save_eval_stats: bool = True,
    collect_predictions: bool = False,
    collect_overall_stats: bool = False,
    output_directory: str = "results",
    return_type: type = pd.DataFrame,
    **kwargs: Any,
) -> tuple[dict, dict | pd.DataFrame, str]:
    """Evaluate a trained model and compute performance statistics.

    Args:
        dataset: Source containing the dataset to evaluate.
        data_format: Format hint for the data source. Inferred automatically when `None`.
            Valid values: `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
            `'fwf'`, `'hdf5'`, `'html'`, `'json'`, `'jsonl'`, `'parquet'`, `'pickle'`,
            `'sas'`, `'spss'`, `'stata'`, `'tsv'`.
        split: Which split of the data to use when the dataset contains a split column.
            One of `'full'`, `'training'`, `'validation'`, `'test'`.
        batch_size: Number of rows per evaluation batch. Defaults to `eval_batch_size`
            from the trainer config.
        skip_save_unprocessed_output: When `False`, raw numpy tensors are saved alongside
            postprocessed CSV files. When `True` (default), only CSVs are written.
        skip_save_predictions: Skip writing prediction CSV files.
        skip_save_eval_stats: Skip writing evaluation statistics JSON.
        collect_predictions: Collect and return postprocessed predictions.
        collect_overall_stats: Compute and include dataset-level aggregate metrics.
        output_directory: Root directory for saved evaluation outputs.
        return_type: Format for returned predictions (`pd.DataFrame` or `dict`).
        **kwargs: Forwarded to preprocessing.

    Returns:
        A tuple `(eval_stats, predictions, output_directory)` where `eval_stats` is a
        nested dict of feature β†’ metric β†’ value, `predictions` is a `pd.DataFrame` or
        `dict` of model outputs, and `output_directory` is the path where results were
        saved.
    """
    self._check_initialization()

    for callback in self.callbacks:
        callback.on_evaluation_start()

    # preprocessing
    logger.debug("Preprocessing dataset for evaluation")
    dataset, training_set_metadata = self._preprocess_for_prediction(
        dataset,
        data_format=data_format,
        split=split,
        include_outputs=True,
    )

    # Fallback to use eval_batch_size or batch_size if not provided
    if batch_size is None:
        # Requires dictionary getter since some trainer configs may not have a batch_size param
        trainer_dict = self.config_obj.trainer.to_dict()
        batch_size = trainer_dict.get(EVAL_BATCH_SIZE) or trainer_dict.get(BATCH_SIZE)
    if batch_size is None:
        raise ValueError(
            "batch_size not specified and no default found in trainer config. "
            "Set batch_size or eval_batch_size in your trainer config."
        )

    logger.debug(f"Running batch evaluation (batch_size={batch_size})")
    with self.backend.create_predictor(self.model, batch_size=batch_size) as predictor:
        eval_stats, predictions = predictor.batch_evaluation(
            dataset,
            collect_predictions=collect_predictions or collect_overall_stats,
        )

        # calculate the overall metrics
        if collect_overall_stats:
            dataset = dataset.to_df()

            overall_stats = calculate_overall_stats(
                self.model.output_features, predictions, dataset, training_set_metadata
            )
            eval_stats = {
                of_name: (
                    {**eval_stats[of_name], **overall_stats[of_name]}
                    # account for presence of 'combined' key
                    if of_name in overall_stats
                    else {**eval_stats[of_name]}
                )
                for of_name in eval_stats
            }

        if self.backend.is_coordinator():
            # if we are skipping all saving,
            # there is no need to create a directory that will remain empty
            should_create_exp_dir = not (
                skip_save_unprocessed_output and skip_save_predictions and skip_save_eval_stats
            )
            if should_create_exp_dir:
                makedirs(output_directory, exist_ok=True)

        if collect_predictions:
            logger.debug("Postprocessing predictions")
            postproc_predictions = postprocess(
                predictions,
                self.model.output_features,
                self.training_set_metadata,
                output_directory=output_directory,
                backend=self.backend,
                skip_save_unprocessed_output=skip_save_unprocessed_output or not self.backend.is_coordinator(),
            )
        else:
            postproc_predictions = predictions  # = {}

        if self.backend.is_coordinator():
            should_save_predictions = (
                collect_predictions and postproc_predictions is not None and not skip_save_predictions
            )
            if should_save_predictions:
                save_prediction_outputs(
                    postproc_predictions, self.model.output_features, output_directory, self.backend
                )

            print_evaluation_stats(eval_stats)
            if not skip_save_eval_stats:
                save_evaluation_stats(eval_stats, output_directory)

            if should_save_predictions or not skip_save_eval_stats:
                logger.info(f"Saved to: {output_directory}")

        if collect_predictions:
            postproc_predictions = convert_predictions(
                postproc_predictions, self.model.output_features, return_type=return_type, backend=self.backend
            )

        for callback in self.callbacks:
            callback.on_evaluation_end()

        return eval_stats, postproc_predictions, output_directory

forecast

forecast(dataset: DataFrame, data_format: str | None = None, horizon: int = 1, output_directory: str | None = None, output_format: str = 'parquet', callbacks: list[Callback] | None = None) -> DataFrame

Forecast horizon steps ahead using an iterative single-pass strategy.

Preprocessing is performed once for the initial lookback window. Each subsequent horizon step slides the window by one position using incremental_time_delay_embedding, reducing preprocessing complexity from O(horizon Γ— window_size) to O(window_size + horizon).

Source code in ludwig/api.py
def forecast(
    self,
    dataset: DataFrame,
    data_format: str | None = None,
    horizon: int = 1,
    output_directory: str | None = None,
    output_format: str = "parquet",
    callbacks: list[Callback] | None = None,
) -> DataFrame:
    """Forecast `horizon` steps ahead using an iterative single-pass strategy.

    Preprocessing is performed once for the initial lookback window. Each subsequent horizon step slides the window
    by one position using incremental_time_delay_embedding, reducing preprocessing complexity from O(horizon Γ—
    window_size) to O(window_size + horizon).
    """
    self._check_initialization()

    # Load raw DataFrame once
    dataset, _, _, _ = load_dataset_uris(dataset, None, None, None, self.backend)
    if isinstance(dataset, CacheableDataset):
        dataset = dataset.unwrap()
    df = load_dataset(dataset, data_format=data_format, df_lib=self.backend.df_engine.df_lib)

    ts_input_features = [f for f in self.config_obj.input_features if f.type == TIMESERIES]
    ts_output_features = [f for f in self.config_obj.output_features if f.type == TIMESERIES]

    if not ts_input_features:
        raise ValueError("Forecasting requires at least one input feature of type `timeseries`.")

    if horizon <= 0:
        return_cols = [f.column for f in ts_output_features]
        return pd.DataFrame({col: pd.Series(dtype=float) for col in return_cols})

    max_window_size = max(f.preprocessing.window_size for f in ts_input_features)

    # Build a mapping from ts output column name β†’ ts output feature config
    ts_output_by_col = {f.column: f for f in ts_output_features}

    # Step 1: Preprocess the initial lookback window once
    initial_df = df.tail(max_window_size)
    preprocessed, _ = self._preprocess_for_prediction(
        initial_df,
        include_outputs=False,
        callbacks=callbacks,
    )

    # Collect the last preprocessed embedding for each input feature.
    # Non-timeseries features stay constant; timeseries features are slid per step.
    # Keyed by proc_column of the model's input features.
    last_embeddings: dict[str, np.ndarray] = {}
    for i_feat in self.model.input_features.values():
        pc = i_feat.proc_column
        if pc in preprocessed.dataset:
            last_embeddings[pc] = preprocessed.dataset[pc][-1].copy()

    # Build a mapping: ts_input_feature.column β†’ (proc_column, window_size, padding_value)
    ts_input_info: list[tuple[str, str, int, float]] = []
    for ts_feat in ts_input_features:
        i_feat = self.model.input_features.get(ts_feat.name)
        if i_feat is not None and i_feat.proc_column in last_embeddings:
            ts_input_info.append(
                (
                    ts_feat.column,
                    i_feat.proc_column,
                    ts_feat.preprocessing.window_size,
                    ts_feat.preprocessing.padding_value,
                )
            )

    # Step 2: Incremental prediction loop β€” O(horizon) steps, each O(1) preprocessing
    predicted_rows: list[pd.DataFrame] = []
    total_forecasted = 0

    with self.backend.create_predictor(self.model, batch_size=1) as predictor:
        while total_forecasted < horizon:
            # Build a single-sample batch from the last embeddings
            batch = {pc: emb[np.newaxis] for pc, emb in last_embeddings.items()}

            # Run model forward pass on one sample, then postprocess
            raw_preds = predictor.predict_single(batch)
            postproc_preds = postprocess(
                raw_preds,
                self.model.output_features,
                self.training_set_metadata,
                backend=self.backend,
                skip_save_unprocessed_output=True,
            )

            # Extract predicted values for each timeseries output feature
            next_series: dict[str, pd.Series] = {}
            for feat in ts_output_features:
                key = f"{feat.name}_predictions"
                next_series[feat.column] = pd.Series(postproc_preds[key].iloc[0])

            next_preds = pd.DataFrame(next_series)
            predicted_rows.append(next_preds)
            total_forecasted += len(next_preds)

            # Step 3: Update embeddings incrementally for the next step.
            # For each timeseries input feature, slide the window by one position.
            for ts_col, proc_col, window_size, padding_value in ts_input_info:
                # Use the predicted value if this ts input is also an output, else padding_value
                # (matches the NaN-fill behavior of the original full-reprocessing path).
                new_val = float(next_preds[ts_col].iloc[-1]) if ts_col in ts_output_by_col else padding_value
                last_embeddings[proc_col] = incremental_time_delay_embedding(
                    new_val, last_embeddings[proc_col], window_size, padding_value
                )

    results_df = pd.concat(predicted_rows, ignore_index=True).head(horizon)
    return_cols = [f.column for f in ts_output_features]
    results_df = results_df[return_cols]

    if output_directory is not None:
        if self.backend.is_coordinator():
            if output_format == "parquet":
                output_path = os.path.join(output_directory, "forecast.parquet")
                results_df.to_parquet(output_path)
            elif output_format == "csv":
                output_path = os.path.join(output_directory, "forecast.csv")
                results_df.to_csv(output_path)
            else:
                raise ValueError(f"`output_format` {output_format} not supported. Must be one of [parquet, csv]")
            logger.info(f"Saved to: {output_path}")

    return results_df

experiment

experiment(dataset: str | dict | DataFrame | None = None, training_set: str | dict | DataFrame | None = None, validation_set: str | dict | DataFrame | None = None, test_set: str | dict | DataFrame | None = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, experiment_name: str = 'experiment', model_name: str = 'run', model_resume_path: str | None = None, eval_split: str = TEST, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = False, skip_save_unprocessed_output: bool = False, skip_save_predictions: bool = False, skip_save_eval_stats: bool = False, skip_collect_predictions: bool = False, skip_collect_overall_stats: bool = False, output_directory: str = 'results', random_seed: int = default_random_seed, **kwargs: Any) -> tuple[dict | None, TrainingStats, PreprocessedDataset, str]

Train a model and immediately evaluate it on a held-out split.

Combines train() and evaluate() in one call. Saves the model, training statistics, and evaluation results to output_directory.

PARAMETER DESCRIPTION
dataset

Source containing the full dataset. Mutually exclusive with training_set / validation_set / test_set.

TYPE: str | dict | DataFrame | None DEFAULT: None

training_set

Source containing training data only.

TYPE: str | dict | DataFrame | None DEFAULT: None

validation_set

Source containing validation data only.

TYPE: str | dict | DataFrame | None DEFAULT: None

test_set

Source containing test data only.

TYPE: str | dict | DataFrame | None DEFAULT: None

training_set_metadata

Pre-computed metadata dict or path to a .meta.json file from a prior Ludwig run on the same dataset.

TYPE: str | dict | None DEFAULT: None

data_format

Format hint for data sources. Inferred automatically when None. Valid values: 'auto', 'csv', 'df', 'dict', 'excel', 'feather', 'fwf', 'hdf5', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

experiment_name

Name used when creating the output directory.

TYPE: str DEFAULT: 'experiment'

model_name

Name used when creating the output directory.

TYPE: str DEFAULT: 'run'

model_resume_path

Resume training from this checkpoint directory.

TYPE: str | None DEFAULT: None

eval_split

Which split to evaluate after training. One of 'training', 'validation', 'test'.

TYPE: str DEFAULT: TEST

skip_save_training_description

Skip saving the experiment description JSON.

TYPE: bool DEFAULT: False

skip_save_training_statistics

Skip saving training statistics JSON.

TYPE: bool DEFAULT: False

skip_save_model

Skip saving model weights after each improvement.

TYPE: bool DEFAULT: False

skip_save_progress

Skip saving per-epoch checkpoints for resuming.

TYPE: bool DEFAULT: False

skip_save_log

Skip saving TensorBoard logs.

TYPE: bool DEFAULT: False

skip_save_processed_input

Skip caching the preprocessed HDF5/JSON files.

TYPE: bool DEFAULT: False

skip_save_unprocessed_output

Skip saving raw numpy prediction tensors.

TYPE: bool DEFAULT: False

skip_save_predictions

Skip writing prediction CSV files.

TYPE: bool DEFAULT: False

skip_save_eval_stats

Skip writing evaluation statistics JSON.

TYPE: bool DEFAULT: False

skip_collect_predictions

Do not collect postprocessed predictions.

TYPE: bool DEFAULT: False

skip_collect_overall_stats

Do not compute dataset-level aggregate metrics.

TYPE: bool DEFAULT: False

output_directory

Root directory for all saved outputs.

TYPE: str DEFAULT: 'results'

random_seed

Seed for weight initialization, data splitting, and shuffling.

TYPE: int DEFAULT: default_random_seed

**kwargs

Forwarded to preprocessing.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict | None

A tuple (eval_stats, train_stats, preprocessed_data, output_directory) where

TrainingStats

eval_stats is performance metrics on the eval split (or None if eval was

PreprocessedDataset

skipped), train_stats is per-epoch training metrics, preprocessed_data

str

holds the three split datasets, and output_directory is where results were

tuple[dict | None, TrainingStats, PreprocessedDataset, str]

saved.

Source code in ludwig/api.py
def experiment(
    self,
    dataset: str | dict | pd.DataFrame | None = None,
    training_set: str | dict | pd.DataFrame | None = None,
    validation_set: str | dict | pd.DataFrame | None = None,
    test_set: str | dict | pd.DataFrame | None = None,
    training_set_metadata: str | dict | None = None,
    data_format: str | None = None,
    experiment_name: str = "experiment",
    model_name: str = "run",
    model_resume_path: str | None = None,
    eval_split: str = TEST,
    skip_save_training_description: bool = False,
    skip_save_training_statistics: bool = False,
    skip_save_model: bool = False,
    skip_save_progress: bool = False,
    skip_save_log: bool = False,
    skip_save_processed_input: bool = False,
    skip_save_unprocessed_output: bool = False,
    skip_save_predictions: bool = False,
    skip_save_eval_stats: bool = False,
    skip_collect_predictions: bool = False,
    skip_collect_overall_stats: bool = False,
    output_directory: str = "results",
    random_seed: int = default_random_seed,
    **kwargs: Any,
) -> tuple[dict | None, TrainingStats, PreprocessedDataset, str]:
    """Train a model and immediately evaluate it on a held-out split.

    Combines `train()` and `evaluate()` in one call. Saves the model,
    training statistics, and evaluation results to `output_directory`.

    Args:
        dataset: Source containing the full dataset. Mutually exclusive with
            `training_set` / `validation_set` / `test_set`.
        training_set: Source containing training data only.
        validation_set: Source containing validation data only.
        test_set: Source containing test data only.
        training_set_metadata: Pre-computed metadata dict or path to a `.meta.json`
            file from a prior Ludwig run on the same dataset.
        data_format: Format hint for data sources. Inferred automatically when `None`.
            Valid values: `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
            `'fwf'`, `'hdf5'`, `'html'`, `'json'`, `'jsonl'`, `'parquet'`, `'pickle'`,
            `'sas'`, `'spss'`, `'stata'`, `'tsv'`.
        experiment_name: Name used when creating the output directory.
        model_name: Name used when creating the output directory.
        model_resume_path: Resume training from this checkpoint directory.
        eval_split: Which split to evaluate after training. One of `'training'`,
            `'validation'`, `'test'`.
        skip_save_training_description: Skip saving the experiment description JSON.
        skip_save_training_statistics: Skip saving training statistics JSON.
        skip_save_model: Skip saving model weights after each improvement.
        skip_save_progress: Skip saving per-epoch checkpoints for resuming.
        skip_save_log: Skip saving TensorBoard logs.
        skip_save_processed_input: Skip caching the preprocessed HDF5/JSON files.
        skip_save_unprocessed_output: Skip saving raw numpy prediction tensors.
        skip_save_predictions: Skip writing prediction CSV files.
        skip_save_eval_stats: Skip writing evaluation statistics JSON.
        skip_collect_predictions: Do not collect postprocessed predictions.
        skip_collect_overall_stats: Do not compute dataset-level aggregate metrics.
        output_directory: Root directory for all saved outputs.
        random_seed: Seed for weight initialization, data splitting, and shuffling.
        **kwargs: Forwarded to preprocessing.

    Returns:
        A tuple `(eval_stats, train_stats, preprocessed_data, output_directory)` where
        `eval_stats` is performance metrics on the eval split (or `None` if eval was
        skipped), `train_stats` is per-epoch training metrics, `preprocessed_data`
        holds the three split datasets, and `output_directory` is where results were
        saved.
    """
    if self._user_config.get(HYPEROPT):
        print_boxed("WARNING")
        logger.warning(HYPEROPT_WARNING)

    train_result = self.train(
        dataset=dataset,
        training_set=training_set,
        validation_set=validation_set,
        test_set=test_set,
        training_set_metadata=training_set_metadata,
        data_format=data_format,
        experiment_name=experiment_name,
        model_name=model_name,
        model_resume_path=model_resume_path,
        skip_save_training_description=skip_save_training_description,
        skip_save_training_statistics=skip_save_training_statistics,
        skip_save_model=skip_save_model,
        skip_save_progress=skip_save_progress,
        skip_save_log=skip_save_log,
        skip_save_processed_input=skip_save_processed_input,
        skip_save_unprocessed_output=skip_save_unprocessed_output,
        output_directory=output_directory,
        random_seed=random_seed,
    )
    train_stats = train_result.train_stats
    preprocessed_data = train_result.preprocessed_data
    output_directory = train_result.output_directory

    eval_set = preprocessed_data.validation_set
    if eval_split == TRAINING:
        eval_set = preprocessed_data.training_set
    elif eval_split == VALIDATION:
        eval_set = preprocessed_data.validation_set
    elif eval_split == TEST:
        eval_set = preprocessed_data.test_set
    else:
        logger.warning(f"Eval split {eval_split} not supported. Using validation set instead")

    if eval_set is not None:
        trainer_dict = self.config_obj.trainer.to_dict()
        batch_size = trainer_dict.get(EVAL_BATCH_SIZE, trainer_dict.get(BATCH_SIZE, None))

        # predict
        try:
            eval_stats, _, _ = self.evaluate(
                eval_set,
                data_format=data_format,
                batch_size=batch_size,
                output_directory=output_directory,
                skip_save_unprocessed_output=skip_save_unprocessed_output,
                skip_save_predictions=skip_save_predictions,
                skip_save_eval_stats=skip_save_eval_stats,
                collect_predictions=not skip_collect_predictions,
                collect_overall_stats=not skip_collect_overall_stats,
                return_type="dict",
            )
        except NotImplementedError:
            logger.warning(
                "Skipping evaluation as the necessary methods are not "
                "supported. Full exception below:\n"
                f"{traceback.format_exc()}"
            )
            eval_stats = None
    else:
        logger.warning(f"The evaluation set {eval_set} was not provided. Skipping evaluation")
        eval_stats = None

    return eval_stats, train_stats, preprocessed_data, output_directory

collect_weights

collect_weights(tensor_names: list[str] | None = None, **kwargs: Any) -> list

Return the named tensors (weight matrices) from the trained model.

PARAMETER DESCRIPTION
tensor_names

Names of tensors to retrieve. When None, all tensors are returned.

TYPE: list[str] | None DEFAULT: None

**kwargs

Unused; accepted for forward-compatibility.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list

List of (name, tensor) tuples.

Source code in ludwig/api.py
def collect_weights(self, tensor_names: list[str] | None = None, **kwargs: Any) -> list:
    """Return the named tensors (weight matrices) from the trained model.

    Args:
        tensor_names: Names of tensors to retrieve. When `None`, all tensors
            are returned.
        **kwargs: Unused; accepted for forward-compatibility.

    Returns:
        List of `(name, tensor)` tuples.
    """
    self._check_initialization()
    collected_tensors = self.model.collect_weights(tensor_names)
    return collected_tensors

collect_activations

collect_activations(layer_names: list[str], dataset: str | dict[str, list] | DataFrame, data_format: str | None = None, split: str = FULL, batch_size: int = 128, **kwargs: Any) -> list

Collect intermediate-layer activations for the given dataset.

PARAMETER DESCRIPTION
layer_names

Names of layers in the model to collect activations from.

TYPE: list[str]

dataset

Source containing the data to run through the model.

TYPE: str | dict[str, list] | DataFrame

data_format

Format hint for the data source. Inferred when None.

TYPE: str | None DEFAULT: None

split

Which data split to use when the dataset has a split column. One of 'full', 'training', 'validation', 'test'.

TYPE: str DEFAULT: FULL

batch_size

Number of rows per inference batch.

TYPE: int DEFAULT: 128

**kwargs

Unused; accepted for forward-compatibility.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list

List of activation tensors, one per layer name.

Source code in ludwig/api.py
def collect_activations(
    self,
    layer_names: list[str],
    dataset: str | dict[str, list] | pd.DataFrame,
    data_format: str | None = None,
    split: str = FULL,
    batch_size: int = 128,
    **kwargs: Any,
) -> list:
    """Collect intermediate-layer activations for the given dataset.

    Args:
        layer_names: Names of layers in the model to collect activations from.
        dataset: Source containing the data to run through the model.
        data_format: Format hint for the data source. Inferred when `None`.
        split: Which data split to use when the dataset has a split column.
            One of `'full'`, `'training'`, `'validation'`, `'test'`.
        batch_size: Number of rows per inference batch.
        **kwargs: Unused; accepted for forward-compatibility.

    Returns:
        List of activation tensors, one per layer name.
    """
    self._check_initialization()

    # preprocessing
    logger.debug("Preprocessing dataset for activation collection")
    dataset, training_set_metadata = self._preprocess_for_prediction(
        dataset,
        data_format=data_format,
        split=split,
        include_outputs=False,
    )

    logger.debug(f"Collecting activations for layers: {layer_names} (batch_size={batch_size})")
    with self.backend.create_predictor(self.model, batch_size=batch_size) as predictor:
        activations = predictor.batch_collect_activations(
            layer_names,
            dataset,
        )

        return activations

preprocess

preprocess(dataset: str | dict | DataFrame | None = None, training_set: str | dict | DataFrame | None = None, validation_set: str | dict | DataFrame | None = None, test_set: str | dict | DataFrame | None = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, skip_save_processed_input: bool = True, random_seed: int = default_random_seed, **kwargs: Any) -> PreprocessedDataset

Preprocess a dataset and return it split into training / validation / test sets.

PARAMETER DESCRIPTION
dataset

Source containing the full dataset. Mutually exclusive with training_set / validation_set / test_set.

TYPE: str | dict | DataFrame | None DEFAULT: None

training_set

Source containing training data only.

TYPE: str | dict | DataFrame | None DEFAULT: None

validation_set

Source containing validation data only.

TYPE: str | dict | DataFrame | None DEFAULT: None

test_set

Source containing test data only.

TYPE: str | dict | DataFrame | None DEFAULT: None

training_set_metadata

Pre-computed metadata dict or .meta.json path from a prior Ludwig run on the same dataset.

TYPE: str | dict | None DEFAULT: None

data_format

Format hint for data sources. Inferred when None. Valid values: 'auto', 'csv', 'df', 'dict', 'excel', 'feather', 'fwf', 'hdf5', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

skip_save_processed_input

Skip caching the preprocessed HDF5/JSON files.

TYPE: bool DEFAULT: True

random_seed

Seed for data splitting and shuffling.

TYPE: int DEFAULT: default_random_seed

**kwargs

Forwarded to the underlying preprocessing function.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
PreprocessedDataset

A PreprocessedDataset namedtuple with fields training_set,

PreprocessedDataset

validation_set, test_set, and training_set_metadata.

RAISES DESCRIPTION
RuntimeError

If preprocessing fails (e.g., empty training set after filtering, or lazy loading incompatible with RayBackend).

Source code in ludwig/api.py
def preprocess(
    self,
    dataset: str | dict | pd.DataFrame | None = None,
    training_set: str | dict | pd.DataFrame | None = None,
    validation_set: str | dict | pd.DataFrame | None = None,
    test_set: str | dict | pd.DataFrame | None = None,
    training_set_metadata: str | dict | None = None,
    data_format: str | None = None,
    skip_save_processed_input: bool = True,
    random_seed: int = default_random_seed,
    **kwargs: Any,
) -> PreprocessedDataset:
    """Preprocess a dataset and return it split into training / validation / test sets.

    Args:
        dataset: Source containing the full dataset. Mutually exclusive with
            `training_set` / `validation_set` / `test_set`.
        training_set: Source containing training data only.
        validation_set: Source containing validation data only.
        test_set: Source containing test data only.
        training_set_metadata: Pre-computed metadata dict or `.meta.json` path
            from a prior Ludwig run on the same dataset.
        data_format: Format hint for data sources. Inferred when `None`.
            Valid values: `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`,
            `'feather'`, `'fwf'`, `'hdf5'`, `'html'`, `'json'`, `'jsonl'`,
            `'parquet'`, `'pickle'`, `'sas'`, `'spss'`, `'stata'`, `'tsv'`.
        skip_save_processed_input: Skip caching the preprocessed HDF5/JSON files.
        random_seed: Seed for data splitting and shuffling.
        **kwargs: Forwarded to the underlying preprocessing function.

    Returns:
        A `PreprocessedDataset` namedtuple with fields `training_set`,
        `validation_set`, `test_set`, and `training_set_metadata`.

    Raises:
        RuntimeError: If preprocessing fails (e.g., empty training set after
            filtering, or lazy loading incompatible with RayBackend).
    """
    print_boxed("PREPROCESSING")

    for callback in self.callbacks:
        callback.on_preprocess_start(self.config_obj.to_dict())

    preprocessing_params = get_preprocessing_params(self.config_obj)

    proc_training_set = proc_validation_set = proc_test_set = None
    try:
        with provision_preprocessing_workers(self.backend):
            preprocessed_data = preprocess_for_training(
                self.config_obj,
                dataset=dataset,
                training_set=training_set,
                validation_set=validation_set,
                test_set=test_set,
                training_set_metadata=training_set_metadata,
                data_format=data_format,
                skip_save_processed_input=skip_save_processed_input,
                preprocessing_params=preprocessing_params,
                backend=self.backend,
                random_seed=random_seed,
                callbacks=self.callbacks,
            )

        proc_training_set, proc_validation_set, proc_test_set, training_set_metadata = preprocessed_data

        return PreprocessedDataset(proc_training_set, proc_validation_set, proc_test_set, training_set_metadata)
    except Exception:
        logger.debug(traceback.format_exc())
        raise
    finally:
        for callback in self.callbacks:
            callback.on_preprocess_end(proc_training_set, proc_validation_set, proc_test_set, training_set_metadata)

load staticmethod

load(model_dir: str, logging_level: int = logging.ERROR, backend: Backend | str | None = None, gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, callbacks: list[Callback] | None = None, from_checkpoint: bool = False) -> LudwigModel

Load a previously trained LudwigModel from disk.

PARAMETER DESCRIPTION
model_dir

Path to the saved model directory (typically results/<experiment>/<model>/model/).

TYPE: str

logging_level

Log level sent to stderr (e.g., logging.INFO).

TYPE: int DEFAULT: ERROR

backend

Backend instance or string name used for preprocessing.

TYPE: Backend | str | None DEFAULT: None

gpus

GPUs to use; same syntax as CUDA_VISIBLE_DEVICES.

TYPE: str | int | list[int] | None DEFAULT: None

gpu_memory_limit

Maximum memory fraction [0, 1] allowed per GPU.

TYPE: float | None DEFAULT: None

allow_parallel_threads

Allow Torch multi-threading for performance at the cost of determinism.

TYPE: bool DEFAULT: True

callbacks

List of Callback objects providing hooks into the pipeline.

TYPE: list[Callback] | None DEFAULT: None

from_checkpoint

When True, load from the latest training checkpoint in training_checkpoints/ instead of the final model weights.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
LudwigModel

A fully initialized LudwigModel ready for inference.

model = LudwigModel.load("results/experiment/run/model")
predictions, _ = model.predict(dataset=df)
Source code in ludwig/api.py
@staticmethod
def load(
    model_dir: str,
    logging_level: int = logging.ERROR,
    backend: Backend | str | None = None,
    gpus: str | int | list[int] | None = None,
    gpu_memory_limit: float | None = None,
    allow_parallel_threads: bool = True,
    callbacks: list[Callback] | None = None,
    from_checkpoint: bool = False,
) -> "LudwigModel":  # return is an instance of ludwig.api.LudwigModel class
    """Load a previously trained LudwigModel from disk.

    Args:
        model_dir: Path to the saved model directory (typically
            `results/<experiment>/<model>/model/`).
        logging_level: Log level sent to stderr (e.g., `logging.INFO`).
        backend: Backend instance or string name used for preprocessing.
        gpus: GPUs to use; same syntax as CUDA_VISIBLE_DEVICES.
        gpu_memory_limit: Maximum memory fraction [0, 1] allowed per GPU.
        allow_parallel_threads: Allow Torch multi-threading for performance
            at the cost of determinism.
        callbacks: List of `Callback` objects providing hooks into the pipeline.
        from_checkpoint: When `True`, load from the latest training checkpoint
            in `training_checkpoints/` instead of the final model weights.

    Returns:
        A fully initialized `LudwigModel` ready for inference.

    Example::

        model = LudwigModel.load("results/experiment/run/model")
        predictions, _ = model.predict(dataset=df)
    """
    # Initialize PyTorch before calling `broadcast()` to prevent initializing
    # Torch with default parameters
    backend_param = backend
    backend = initialize_backend(backend)
    backend.initialize_pytorch(
        gpus=gpus, gpu_memory_limit=gpu_memory_limit, allow_parallel_threads=allow_parallel_threads
    )

    logger.info(f"Loading model from {model_dir}")
    config = backend.broadcast_return(lambda: load_json(os.path.join(model_dir, MODEL_HYPERPARAMETERS_FILE_NAME)))

    # Upgrades deprecated fields and adds new required fields in case the config loaded from disk is old.
    config_obj = ModelConfig.from_dict(config)

    # Ensure that the original backend is used if it was specified in the config and user requests it
    if backend_param is None and "backend" in config:
        # Reset backend from config
        backend = initialize_backend(config.get("backend"))

    # initialize model
    ludwig_model = LudwigModel(
        config_obj.to_dict(),
        logging_level=logging_level,
        backend=backend,
        gpus=gpus,
        gpu_memory_limit=gpu_memory_limit,
        allow_parallel_threads=allow_parallel_threads,
        callbacks=callbacks,
    )

    # generate model from config
    set_saved_weights_in_checkpoint_flag(config_obj)
    ludwig_model._get_or_create_model(config_obj)

    # load model weights
    logger.info(f"Loading model weights from {model_dir}")
    ludwig_model.load_weights(model_dir, from_checkpoint)

    # If merge_and_unload was NOT performed before saving (i.e., adapter weights exist),
    # we need to merge them now for inference.
    if ludwig_model.is_merge_and_unload_set():
        weights_save_path = os.path.join(model_dir, MODEL_WEIGHTS_FILE_NAME)
        adapter_config_path = os.path.join(weights_save_path, "adapter_config.json")
        if os.path.exists(adapter_config_path):
            ludwig_model.model.merge_and_unload(progressbar=config_obj.adapter.postprocessor.progressbar)

    # load train set metadata
    ludwig_model.training_set_metadata = backend.broadcast_return(
        lambda: load_metadata(os.path.join(model_dir, TRAIN_SET_METADATA_FILE_NAME))
    )

    return ludwig_model

load_weights

load_weights(model_dir: str, from_checkpoint: bool = False) -> None

Load model weights from a saved model directory.

PARAMETER DESCRIPTION
model_dir

Path to the saved model directory.

TYPE: str

from_checkpoint

When True, load from the latest training checkpoint instead of the final model weights.

TYPE: bool DEFAULT: False

Source code in ludwig/api.py
def load_weights(
    self,
    model_dir: str,
    from_checkpoint: bool = False,
) -> None:
    """Load model weights from a saved model directory.

    Args:
        model_dir: Path to the saved model directory.
        from_checkpoint: When `True`, load from the latest training checkpoint
            instead of the final model weights.
    """
    if self.backend.is_coordinator():
        if from_checkpoint:
            with self.backend.create_trainer(
                model=self.model,
                config=self.config_obj.trainer,
            ) as trainer:
                checkpoint = trainer.create_checkpoint_handle()
                training_checkpoints_path = os.path.join(model_dir, TRAINING_CHECKPOINTS_DIR_PATH)
                trainer.resume_weights_and_optimizer(training_checkpoints_path, checkpoint)
        else:
            self.model.load(model_dir)

    self.backend.sync_model(self.model)

save

save(save_path: str) -> None

Save the model config, weights, and training metadata to save_path.

PARAMETER DESCRIPTION
save_path

Directory where the model will be saved. Created if it does not exist. Contains model_hyperparameters.json, weight files, and training_set_metadata.json.

TYPE: str

Source code in ludwig/api.py
def save(self, save_path: str) -> None:
    """Save the model config, weights, and training metadata to `save_path`.

    Args:
        save_path: Directory where the model will be saved. Created if it
            does not exist. Contains `model_hyperparameters.json`, weight
            files, and `training_set_metadata.json`.
    """
    self._check_initialization()

    # save config
    self.save_config(save_path)

    # save model weights
    self.model.save(save_path)

    # save training set metadata
    training_set_metadata_path = os.path.join(save_path, TRAIN_SET_METADATA_FILE_NAME)
    save_json(training_set_metadata_path, self.training_set_metadata)

upload_to_hf_hub staticmethod

upload_to_hf_hub(repo_id: str, model_path: str, repo_type: str = 'model', private: bool = False, commit_message: str = 'Upload trained [Ludwig](https://ludwig.ai/latest/) model weights', commit_description: str | None = None) -> bool

Uploads trained model artifacts to the HuggingFace Hub.

PARAMETER DESCRIPTION
repo_id

A namespace (user or an organization) and a repo name separated by a /.

TYPE: str

model_path

The path of the saved model. This is either (a) the folder where the 'model_weights' folder and the 'model_hyperparameters.json' file are stored, or (b) the parent of that folder.

TYPE: str

private

Whether the model repo should be private. Defaults to False.

TYPE: bool DEFAULT: False

repo_type

Set to "dataset" or "space" if uploading to a dataset or space, None or "model" if uploading to a model. Default is None.

TYPE: str DEFAULT: 'model'

commit_message

The summary / title / first line of the generated commit.

TYPE: str DEFAULT: 'Upload trained [Ludwig](https://ludwig.ai/latest/) model weights'

commit_description

The description of the generated commit.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
bool

True for success, False for failure.

Source code in ludwig/api.py
@staticmethod
def upload_to_hf_hub(
    repo_id: str,
    model_path: str,
    repo_type: str = "model",
    private: bool = False,
    commit_message: str = "Upload trained [Ludwig](https://ludwig.ai/latest/) model weights",
    commit_description: str | None = None,
) -> bool:
    """Uploads trained model artifacts to the HuggingFace Hub.

    Args:
        repo_id: A namespace (user or an organization) and a repo name separated by a `/`.
        model_path: The path of the saved model. This is either (a) the folder where the 'model_weights'
            folder and the 'model_hyperparameters.json' file are stored, or (b) the parent of that folder.
        private: Whether the model repo should be private. Defaults to False.
        repo_type: Set to `"dataset"` or `"space"` if uploading to a dataset or space, `None` or `"model"`
            if uploading to a model. Default is `None`.
        commit_message: The summary / title / first line of the generated commit.
        commit_description: The description of the generated commit.

    Returns:
        True for success, False for failure.
    """
    if model_weights_exist(os.path.join(model_path, MODEL_FILE_NAME)) and os.path.exists(
        os.path.join(model_path, MODEL_FILE_NAME, MODEL_HYPERPARAMETERS_FILE_NAME)
    ):
        experiment_path = model_path
    elif model_weights_exist(model_path) and os.path.exists(
        os.path.join(model_path, MODEL_HYPERPARAMETERS_FILE_NAME)
    ):
        experiment_path = os.path.dirname(model_path)
    else:
        raise ValueError(
            f"Can't find model weights and '{MODEL_HYPERPARAMETERS_FILE_NAME}' either at "
            f"'{model_path}' or at '{model_path}/model'"
        )
    model_service = get_upload_registry()["hf_hub"]
    hub: HuggingFaceHub = model_service()
    hub.login()
    upload_status: bool = hub.upload(
        repo_id=repo_id,
        model_path=experiment_path,
        repo_type=repo_type,
        private=private,
        commit_message=commit_message,
        commit_description=commit_description,
    )
    return upload_status

save_config

save_config(save_path: str) -> None

Save config to specified location.

PARAMETER DESCRIPTION
save_path

filepath string to save config as a JSON file.

TYPE: str

Source code in ludwig/api.py
def save_config(self, save_path: str) -> None:
    """Save config to specified location.

    Args:
        save_path: filepath string to save config as a JSON file.
    """
    os.makedirs(save_path, exist_ok=True)
    model_hyperparameters_path = os.path.join(save_path, MODEL_HYPERPARAMETERS_FILE_NAME)
    save_json(model_hyperparameters_path, self.config_obj.to_dict())

export_model

export_model(save_path: str, format: str = 'safetensors', sample_input: dict | None = None) -> None

Export the model in various formats.

PARAMETER DESCRIPTION
save_path

Directory to save the exported model.

TYPE: str

format

Export format. One of "safetensors", "torch_export", "onnx".

TYPE: str DEFAULT: 'safetensors'

sample_input

Example input for tracing (required for torch_export and onnx).

TYPE: dict | None DEFAULT: None

Source code in ludwig/api.py
def export_model(self, save_path: str, format: str = "safetensors", sample_input: dict | None = None) -> None:
    """Export the model in various formats.

    Args:
        save_path: Directory to save the exported model.
        format: Export format. One of "safetensors", "torch_export", "onnx".
        sample_input: Example input for tracing (required for torch_export and onnx).
    """
    from ludwig.utils.model_export import ModelExporter

    exporter = ModelExporter(self.model)

    if format == "safetensors":
        return exporter.export_safetensors(save_path)
    elif format == "torch_export":
        return exporter.export_torch(save_path, sample_input)
    elif format == "onnx":
        return exporter.export_onnx(save_path, sample_input)
    else:
        raise ValueError(f"Unknown export format: {format}. Options: safetensors, torch_export, onnx")

free_gpu_memory

free_gpu_memory() -> None

Manually moves the model to CPU to force GPU memory to be freed.

For more context: https://discuss.pytorch.org/t/how-can-we-release-gpu-memory-cache/14530/35

Source code in ludwig/api.py
def free_gpu_memory(self) -> None:
    """Manually moves the model to CPU to force GPU memory to be freed.

    For more context: https://discuss.pytorch.org/t/how-can-we-release-gpu-memory-cache/14530/35
    """
    if torch.cuda.is_available():
        self.model.model.to(torch.device("cpu"))
        torch.cuda.empty_cache()

create_model staticmethod

create_model(config_obj: ModelConfig | dict, random_seed: int = default_random_seed) -> BaseModel

Instantiates BaseModel object.

PARAMETER DESCRIPTION
config_obj

Ludwig config object.

TYPE: ModelConfig | dict

random_seed

Random seed used for weights initialization, splits and any other random function.

TYPE: int DEFAULT: default_random_seed

RETURNS DESCRIPTION
BaseModel

Instance of the Ludwig model object.

Source code in ludwig/api.py
@staticmethod
def create_model(config_obj: ModelConfig | dict, random_seed: int = default_random_seed) -> BaseModel:
    """Instantiates BaseModel object.

    Args:
        config_obj: Ludwig config object.
        random_seed: Random seed used for weights initialization, splits and any other random function.

    Returns:
        Instance of the Ludwig model object.
    """
    if isinstance(config_obj, dict):
        config_obj = ModelConfig.from_dict(config_obj)
    model_type = get_from_registry(config_obj.model_type, model_type_registry)
    return model_type(config_obj, random_seed=random_seed)

set_logging_level staticmethod

set_logging_level(logging_level: int) -> None

Sets level for log messages.

PARAMETER DESCRIPTION
logging_level

Set/Update the logging level. Use logging constants like logging.DEBUG, logging.INFO and logging.ERROR.

TYPE: int

Source code in ludwig/api.py
@staticmethod
def set_logging_level(logging_level: int) -> None:
    """Sets level for log messages.

    Args:
        logging_level: Set/Update the logging level. Use logging constants like `logging.DEBUG`,
            `logging.INFO` and `logging.ERROR`.
    """
    logging.getLogger("ludwig").setLevel(logging_level)
    if logging_level in {logging.WARNING, logging.ERROR, logging.CRITICAL}:
        set_disable_progressbar(True)
    else:
        set_disable_progressbar(False)

is_merge_and_unload_set

is_merge_and_unload_set() -> bool

Return True if this model is an LLM configured to merge_and_unload QLoRA adapter weights.

Source code in ludwig/api.py
def is_merge_and_unload_set(self) -> bool:
    """Return True if this model is an LLM configured to merge_and_unload QLoRA adapter weights."""
    return self.config_obj.model_type == MODEL_LLM and self.model.is_merge_and_unload_set()

ludwig.api.kfold_cross_validate

kfold_cross_validate(num_folds: int, config: dict | str, dataset: str | None = None, data_format: str | None = None, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = False, skip_save_predictions: bool = False, skip_save_eval_stats: bool = False, skip_collect_predictions: bool = False, skip_collect_overall_stats: bool = False, output_directory: str = 'results', random_seed: int = default_random_seed, gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, backend: Backend | str | None = None, logging_level: int = logging.INFO, **kwargs: Any) -> tuple[dict, dict]

Perform k-fold cross-validation and return aggregated metrics.

PARAMETER DESCRIPTION
num_folds

Number of folds for cross-validation.

TYPE: int

config

Model config dict or path to a YAML config file.

TYPE: dict | str

dataset

Source containing the full dataset. Note: 'hdf5' format is not supported for k-fold cross-validation.

TYPE: str | None DEFAULT: None

data_format

Format hint for the data source. Inferred automatically when None. Valid values: 'auto', 'csv', 'df', 'dict', 'excel', 'feather', 'fwf', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

skip_save_training_description

Skip saving the experiment description JSON.

TYPE: bool DEFAULT: False

skip_save_training_statistics

Skip saving training statistics JSON.

TYPE: bool DEFAULT: False

skip_save_model

Skip saving model weights after each improvement.

TYPE: bool DEFAULT: False

skip_save_progress

Skip saving per-epoch checkpoints for resuming.

TYPE: bool DEFAULT: False

skip_save_log

Skip saving TensorBoard logs.

TYPE: bool DEFAULT: False

skip_save_processed_input

Skip caching preprocessed HDF5/JSON files.

TYPE: bool DEFAULT: False

skip_save_predictions

Skip writing prediction CSV files.

TYPE: bool DEFAULT: False

skip_save_eval_stats

Skip writing evaluation statistics JSON.

TYPE: bool DEFAULT: False

skip_collect_predictions

Do not collect postprocessed predictions.

TYPE: bool DEFAULT: False

skip_collect_overall_stats

Do not compute dataset-level aggregate metrics.

TYPE: bool DEFAULT: False

output_directory

Root directory for saved outputs.

TYPE: str DEFAULT: 'results'

random_seed

Seed for weight initialization, data splitting, and shuffling.

TYPE: int DEFAULT: default_random_seed

gpus

GPUs to use; same syntax as CUDA_VISIBLE_DEVICES.

TYPE: str | int | list[int] | None DEFAULT: None

gpu_memory_limit

Maximum memory fraction [0, 1] allowed per GPU device.

TYPE: float | None DEFAULT: None

allow_parallel_threads

Allow Torch multi-threading at the cost of determinism.

TYPE: bool DEFAULT: True

backend

Backend instance or string name for preprocessing and training.

TYPE: Backend | str | None DEFAULT: None

logging_level

Log level sent to stderr.

TYPE: int DEFAULT: INFO

**kwargs

Forwarded to each fold's experiment() call.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict

A tuple (kfold_cv_statistics, kfold_split_indices) where

dict

kfold_cv_statistics maps fold name β†’ training + eval metrics, and

tuple[dict, dict]

kfold_split_indices maps fold name β†’ training/test index arrays.

Source code in ludwig/api.py
@PublicAPI
def kfold_cross_validate(
    num_folds: int,
    config: dict | str,
    dataset: str | None = None,
    data_format: str | None = None,
    skip_save_training_description: bool = False,
    skip_save_training_statistics: bool = False,
    skip_save_model: bool = False,
    skip_save_progress: bool = False,
    skip_save_log: bool = False,
    skip_save_processed_input: bool = False,
    skip_save_predictions: bool = False,
    skip_save_eval_stats: bool = False,
    skip_collect_predictions: bool = False,
    skip_collect_overall_stats: bool = False,
    output_directory: str = "results",
    random_seed: int = default_random_seed,
    gpus: str | int | list[int] | None = None,
    gpu_memory_limit: float | None = None,
    allow_parallel_threads: bool = True,
    backend: Backend | str | None = None,
    logging_level: int = logging.INFO,
    **kwargs: Any,
) -> tuple[dict, dict]:
    """Perform k-fold cross-validation and return aggregated metrics.

    Args:
        num_folds: Number of folds for cross-validation.
        config: Model config dict or path to a YAML config file.
        dataset: Source containing the full dataset. Note: `'hdf5'` format is
            not supported for k-fold cross-validation.
        data_format: Format hint for the data source. Inferred automatically when
            `None`. Valid values: `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`,
            `'feather'`, `'fwf'`, `'html'`, `'json'`, `'jsonl'`, `'parquet'`,
            `'pickle'`, `'sas'`, `'spss'`, `'stata'`, `'tsv'`.
        skip_save_training_description: Skip saving the experiment description JSON.
        skip_save_training_statistics: Skip saving training statistics JSON.
        skip_save_model: Skip saving model weights after each improvement.
        skip_save_progress: Skip saving per-epoch checkpoints for resuming.
        skip_save_log: Skip saving TensorBoard logs.
        skip_save_processed_input: Skip caching preprocessed HDF5/JSON files.
        skip_save_predictions: Skip writing prediction CSV files.
        skip_save_eval_stats: Skip writing evaluation statistics JSON.
        skip_collect_predictions: Do not collect postprocessed predictions.
        skip_collect_overall_stats: Do not compute dataset-level aggregate metrics.
        output_directory: Root directory for saved outputs.
        random_seed: Seed for weight initialization, data splitting, and shuffling.
        gpus: GPUs to use; same syntax as CUDA_VISIBLE_DEVICES.
        gpu_memory_limit: Maximum memory fraction [0, 1] allowed per GPU device.
        allow_parallel_threads: Allow Torch multi-threading at the cost of determinism.
        backend: Backend instance or string name for preprocessing and training.
        logging_level: Log level sent to stderr.
        **kwargs: Forwarded to each fold's `experiment()` call.

    Returns:
        A tuple `(kfold_cv_statistics, kfold_split_indices)` where
        `kfold_cv_statistics` maps fold name β†’ training + eval metrics, and
        `kfold_split_indices` maps fold name β†’ training/test index arrays.
    """
    # if config is a path, convert to dictionary
    if isinstance(config, str):  # assume path
        config = load_yaml(config)
    backend = initialize_backend(backend or config.get("backend"))

    # check for k_fold
    if num_folds is None:
        raise ValueError("k_fold parameter must be specified")

    logger.info(f"starting {num_folds:d}-fold cross validation")

    # create output_directory if not available
    if not os.path.isdir(output_directory):
        os.mkdir(output_directory)

    # prepare data for k-fold processing
    # use Ludwig's utility to facilitate creating a dataframe
    # that is used as the basis for creating folds

    dataset, _, _, _ = load_dataset_uris(dataset, None, None, None, backend)

    # determine data format of provided dataset
    if not data_format or data_format == "auto":
        data_format = figure_data_format(dataset)

    data_df = load_dataset(dataset, data_format=data_format, df_lib=backend.df_engine.df_lib)

    kfold_cv_stats = {}
    kfold_split_indices = {}

    for train_indices, test_indices, fold_num in generate_kfold_splits(data_df, num_folds, random_seed):
        with tempfile.TemporaryDirectory() as temp_dir_name:
            curr_train_df = data_df.iloc[train_indices]
            curr_test_df = data_df.iloc[test_indices]

            kfold_split_indices["fold_" + str(fold_num)] = {
                "training_indices": train_indices,
                "test_indices": test_indices,
            }

            # train and validate model on this fold
            logger.info(f"training on fold {fold_num:d}")

            model = LudwigModel(
                config=config,
                logging_level=logging_level,
                backend=backend,
                gpus=gpus,
                gpu_memory_limit=gpu_memory_limit,
                allow_parallel_threads=allow_parallel_threads,
            )
            eval_stats, train_stats, preprocessed_data, output_directory = model.experiment(
                training_set=curr_train_df,
                test_set=curr_test_df,
                experiment_name="cross_validation",
                model_name="fold_" + str(fold_num),
                skip_save_training_description=skip_save_training_description,
                skip_save_training_statistics=skip_save_training_statistics,
                skip_save_model=skip_save_model,
                skip_save_progress=skip_save_progress,
                skip_save_log=skip_save_log,
                skip_save_processed_input=skip_save_processed_input,
                skip_save_predictions=skip_save_predictions,
                skip_save_eval_stats=skip_save_eval_stats,
                skip_collect_predictions=skip_collect_predictions,
                skip_collect_overall_stats=skip_collect_overall_stats,
                output_directory=os.path.join(temp_dir_name, "results"),
                random_seed=random_seed,
            )

            # augment the training statistics with scoring metric from
            # the hold out fold
            if dataclasses.is_dataclass(train_stats):
                train_stats_dict = dataclasses.asdict(train_stats)
            elif hasattr(train_stats, "to_dict"):
                train_stats_dict = train_stats.to_dict()
            else:
                train_stats_dict = vars(train_stats)
            train_stats_dict["fold_eval_stats"] = eval_stats

            # collect training statistics for this fold
            kfold_cv_stats["fold_" + str(fold_num)] = train_stats_dict

    # consolidate raw fold metrics across all folds
    raw_kfold_stats = {}
    for fold_name in kfold_cv_stats:
        curr_fold_eval_stats = kfold_cv_stats[fold_name]["fold_eval_stats"]
        for of_name in curr_fold_eval_stats:
            if of_name not in raw_kfold_stats:
                raw_kfold_stats[of_name] = {}
            fold_eval_stats_of = curr_fold_eval_stats[of_name]

            for metric in fold_eval_stats_of:
                if metric not in {
                    "predictions",
                    "probabilities",
                    "confusion_matrix",
                    "overall_stats",
                    "per_class_stats",
                    "roc_curve",
                    "precision_recall_curve",
                }:
                    if metric not in raw_kfold_stats[of_name]:
                        raw_kfold_stats[of_name][metric] = []
                    raw_kfold_stats[of_name][metric].append(fold_eval_stats_of[metric])

    # calculate overall kfold statistics
    overall_kfold_stats = {}
    for of_name in raw_kfold_stats:
        overall_kfold_stats[of_name] = {}
        for metric in raw_kfold_stats[of_name]:
            mean = np.mean(raw_kfold_stats[of_name][metric])
            std = np.std(raw_kfold_stats[of_name][metric])
            overall_kfold_stats[of_name][metric + "_mean"] = mean
            overall_kfold_stats[of_name][metric + "_std"] = std

    kfold_cv_stats["overall"] = overall_kfold_stats

    logger.info(f"completed {num_folds:d}-fold cross validation")

    return kfold_cv_stats, kfold_split_indices

ludwig.hyperopt.run.hyperopt

hyperopt(config: str | dict, dataset: str | dict | DataFrame = None, training_set: str | dict | DataFrame = None, validation_set: str | dict | DataFrame = None, test_set: str | dict | DataFrame = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, experiment_name: str = 'hyperopt', model_name: str = 'run', resume: bool | None = None, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = True, skip_save_unprocessed_output: bool = False, skip_save_predictions: bool = False, skip_save_eval_stats: bool = False, skip_save_hyperopt_statistics: bool = False, output_directory: str = 'results', gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, callbacks: list[Callback] | None = None, tune_callbacks: list[Callback] | None = None, backend: Backend | str = None, random_seed: int = default_random_seed, hyperopt_log_verbosity: int = 3, **kwargs) -> HyperoptResults

Run hyperparameter optimization.

PARAMETER DESCRIPTION
config

Config dict or path to a YAML config file.

TYPE: str | dict

dataset

Source containing the entire dataset. If it has a split column, it will be used for splitting (0: train, 1: validation, 2: test); otherwise the dataset will be randomly split.

TYPE: str | dict | DataFrame DEFAULT: None

training_set

Source containing training data.

TYPE: str | dict | DataFrame DEFAULT: None

validation_set

Source containing validation data.

TYPE: str | dict | DataFrame DEFAULT: None

test_set

Source containing test data.

TYPE: str | dict | DataFrame DEFAULT: None

training_set_metadata

Metadata JSON file or loaded metadata dict. Intermediate preprocessed structure containing feature mappings created the first time an input file is used.

TYPE: str | dict | None DEFAULT: None

data_format

Format to interpret data sources. Inferred automatically if not specified. Valid values: 'auto', 'csv', 'excel', 'feather', 'fwf', 'hdf5', 'html', 'json', 'jsonl', 'parquet', 'pickle', 'sas', 'spss', 'stata', 'tsv'.

TYPE: str | None DEFAULT: None

experiment_name

Name for the experiment.

TYPE: str DEFAULT: 'hyperopt'

model_name

Name of the model being used.

TYPE: str DEFAULT: 'run'

resume

If True, resume from the previous run in output_directory with the same experiment name. If False, create new trials ignoring any prior state. Defaults to resuming when a matching experiment exists, creating new trials otherwise.

TYPE: bool | None DEFAULT: None

skip_save_training_description

Disable saving the description JSON file.

TYPE: bool DEFAULT: False

skip_save_training_statistics

Disable saving training statistics JSON file.

TYPE: bool DEFAULT: False

skip_save_model

Disable saving model weights after each epoch the validation metric improves. The returned model will have weights from the final epoch rather than the best epoch.

TYPE: bool DEFAULT: False

skip_save_progress

Disable saving weights and stats after each epoch (disables training resumption).

TYPE: bool DEFAULT: False

skip_save_log

Disable saving TensorBoard logs.

TYPE: bool DEFAULT: False

skip_save_processed_input

Disable caching preprocessed input as HDF5/JSON files.

TYPE: bool DEFAULT: True

skip_save_unprocessed_output

If True, skip saving raw numpy output files; only postprocessed CSV files are saved.

TYPE: bool DEFAULT: False

skip_save_predictions

Disable saving test prediction CSV files.

TYPE: bool DEFAULT: False

skip_save_eval_stats

Disable saving test statistics JSON file.

TYPE: bool DEFAULT: False

skip_save_hyperopt_statistics

Disable saving hyperopt stats file.

TYPE: bool DEFAULT: False

output_directory

Directory that will contain training statistics, TensorBoard logs, the saved model, and training progress files.

TYPE: str DEFAULT: 'results'

gpus

List of GPUs available for training.

TYPE: str | int | list[int] | None DEFAULT: None

gpu_memory_limit

Maximum memory fraction [0, 1] allowed to allocate per GPU device.

TYPE: float | None DEFAULT: None

allow_parallel_threads

Allow PyTorch to use multithreading parallelism (improves performance at the cost of determinism).

TYPE: bool DEFAULT: True

callbacks

List of Callback objects providing hooks into the Ludwig pipeline.

TYPE: list[Callback] | None DEFAULT: None

tune_callbacks

Additional Ray Tune callbacks.

TYPE: list[Callback] | None DEFAULT: None

backend

Backend or string name of the backend to use for preprocessing and training.

TYPE: Backend | str DEFAULT: None

random_seed

Random seed for weights initialization, splits, and shuffling.

TYPE: int DEFAULT: default_random_seed

hyperopt_log_verbosity

Verbosity of Ray Tune log messages. 0 = silent, 1 = status only, 2 = status + brief results, 3 = status + detailed results.

TYPE: int DEFAULT: 3

RETURNS DESCRIPTION
HyperoptResults

Trial results ordered by descending performance on the target metric.

Source code in ludwig/hyperopt/run.py
def hyperopt(
    config: str | dict,
    dataset: str | dict | pd.DataFrame = None,
    training_set: str | dict | pd.DataFrame = None,
    validation_set: str | dict | pd.DataFrame = None,
    test_set: str | dict | pd.DataFrame = None,
    training_set_metadata: str | dict | None = None,
    data_format: str | None = None,
    experiment_name: str = "hyperopt",
    model_name: str = "run",
    resume: bool | None = None,
    skip_save_training_description: bool = False,
    skip_save_training_statistics: bool = False,
    skip_save_model: bool = False,
    skip_save_progress: bool = False,
    skip_save_log: bool = False,
    skip_save_processed_input: bool = True,
    skip_save_unprocessed_output: bool = False,
    skip_save_predictions: bool = False,
    skip_save_eval_stats: bool = False,
    skip_save_hyperopt_statistics: bool = False,
    output_directory: str = "results",
    gpus: str | int | list[int] | None = None,
    gpu_memory_limit: float | None = None,
    allow_parallel_threads: bool = True,
    callbacks: list[Callback] | None = None,
    tune_callbacks: list[TuneCallback] | None = None,
    backend: Backend | str = None,
    random_seed: int = default_random_seed,
    hyperopt_log_verbosity: int = 3,
    **kwargs,
) -> HyperoptResults:
    """Run hyperparameter optimization.

    Args:
        config: Config dict or path to a YAML config file.
        dataset: Source containing the entire dataset. If it has a split
            column, it will be used for splitting (0: train, 1: validation,
            2: test); otherwise the dataset will be randomly split.
        training_set: Source containing training data.
        validation_set: Source containing validation data.
        test_set: Source containing test data.
        training_set_metadata: Metadata JSON file or loaded metadata dict.
            Intermediate preprocessed structure containing feature mappings
            created the first time an input file is used.
        data_format: Format to interpret data sources. Inferred automatically
            if not specified. Valid values: ``'auto'``, ``'csv'``,
            ``'excel'``, ``'feather'``, ``'fwf'``, ``'hdf5'``,
            ``'html'``, ``'json'``, ``'jsonl'``, ``'parquet'``,
            ``'pickle'``, ``'sas'``, ``'spss'``, ``'stata'``, ``'tsv'``.
        experiment_name: Name for the experiment.
        model_name: Name of the model being used.
        resume: If ``True``, resume from the previous run in ``output_directory``
            with the same experiment name. If ``False``, create new trials
            ignoring any prior state. Defaults to resuming when a matching
            experiment exists, creating new trials otherwise.
        skip_save_training_description: Disable saving the description JSON
            file.
        skip_save_training_statistics: Disable saving training statistics
            JSON file.
        skip_save_model: Disable saving model weights after each epoch the
            validation metric improves. The returned model will have weights
            from the final epoch rather than the best epoch.
        skip_save_progress: Disable saving weights and stats after each epoch
            (disables training resumption).
        skip_save_log: Disable saving TensorBoard logs.
        skip_save_processed_input: Disable caching preprocessed input as
            HDF5/JSON files.
        skip_save_unprocessed_output: If ``True``, skip saving raw numpy
            output files; only postprocessed CSV files are saved.
        skip_save_predictions: Disable saving test prediction CSV files.
        skip_save_eval_stats: Disable saving test statistics JSON file.
        skip_save_hyperopt_statistics: Disable saving hyperopt stats file.
        output_directory: Directory that will contain training statistics,
            TensorBoard logs, the saved model, and training progress files.
        gpus: List of GPUs available for training.
        gpu_memory_limit: Maximum memory fraction ``[0, 1]`` allowed to
            allocate per GPU device.
        allow_parallel_threads: Allow PyTorch to use multithreading
            parallelism (improves performance at the cost of determinism).
        callbacks: List of ``Callback`` objects providing hooks into the
            Ludwig pipeline.
        tune_callbacks: Additional Ray Tune callbacks.
        backend: Backend or string name of the backend to use for
            preprocessing and training.
        random_seed: Random seed for weights initialization, splits, and
            shuffling.
        hyperopt_log_verbosity: Verbosity of Ray Tune log messages.
            0 = silent, 1 = status only, 2 = status + brief results,
            3 = status + detailed results.

    Returns:
        Trial results ordered by descending performance on the target metric.
    """
    from ludwig.hyperopt.execution import get_build_hyperopt_executor, RayTuneExecutor

    # check if config is a path or a dict
    if isinstance(config, str):  # assume path
        with open_file(config, "r") as def_file:
            config_dict = yaml.safe_load(def_file)
    else:
        config_dict = config

    if HYPEROPT not in config_dict:
        raise ValueError("Hyperopt Section not present in config")

    # backwards compatibility
    upgraded_config = upgrade_config_dict_to_latest_version(config_dict)

    # Initialize config object
    config_obj = ModelConfig.from_dict(upgraded_config)

    # Retain pre-merged config for hyperopt schema generation
    premerged_config = copy.deepcopy(upgraded_config)

    # Get full config with defaults
    full_config = config_obj.to_dict()  # TODO (Connor): Refactor to use config object

    hyperopt_config = full_config[HYPEROPT]

    # Explicitly default to a local backend to avoid picking up Ray
    # backend from the environment.
    backend = backend or config_dict.get("backend") or "local"
    backend = initialize_backend(backend)

    update_hyperopt_params_with_defaults(hyperopt_config)

    # Check if all features are grid type parameters and log UserWarning if needed
    log_warning_if_all_grid_type_parameters(hyperopt_config)

    # Infer max concurrent trials
    if hyperopt_config[EXECUTOR].get(MAX_CONCURRENT_TRIALS) == AUTO:
        hyperopt_config[EXECUTOR][MAX_CONCURRENT_TRIALS] = backend.max_concurrent_trials(hyperopt_config)
        logger.info(f"Setting max_concurrent_trials to {hyperopt_config[EXECUTOR][MAX_CONCURRENT_TRIALS]}")

    # Print hyperopt config
    logger.info("Hyperopt Config")
    logger.info(pformat(hyperopt_config, indent=4))
    logger.info("\n")

    search_alg = hyperopt_config[SEARCH_ALG]
    executor = hyperopt_config[EXECUTOR]
    parameters = hyperopt_config[PARAMETERS]
    split = hyperopt_config[SPLIT]
    output_feature = hyperopt_config["output_feature"]
    metric = hyperopt_config[METRIC]
    goal = hyperopt_config[GOAL]

    ######################
    # check validity of output_feature / metric/ split combination
    ######################
    splitter = get_splitter(**full_config[PREPROCESSING]["split"])
    if split == TRAINING:
        if training_set is None and not splitter.has_split(0):
            raise ValueError(
                f'The data for the specified split for hyperopt "{split}" '
                "was not provided, "
                "or the split amount specified in the preprocessing section "
                "of the config is not greater than 0"
            )
    elif split == VALIDATION:
        if validation_set is None and not splitter.has_split(1):
            raise ValueError(
                f'The data for the specified split for hyperopt "{split}" '
                "was not provided, "
                "or the split amount specified in the preprocessing section "
                "of the config is not greater than 0"
            )
    elif split == TEST:
        if test_set is None and not splitter.has_split(2):
            raise ValueError(
                f'The data for the specified split for hyperopt "{split}" '
                "was not provided, "
                "or the split amount specified in the preprocessing section "
                "of the config is not greater than 0"
            )
    else:
        raise ValueError(
            f'unrecognized hyperopt split "{split}". Please provide one of: { ({TRAINING, VALIDATION, TEST}) }'
        )
    if output_feature == COMBINED:
        if metric != LOSS:
            raise ValueError('The only valid metric for "combined" output feature is "loss"')
    else:
        output_feature_names = {of[NAME] for of in full_config[OUTPUT_FEATURES]}
        if output_feature not in output_feature_names:
            raise ValueError(
                f'The output feature specified for hyperopt "{output_feature}" '
                "cannot be found in the config. "
                f'Available ones are: {output_feature_names} and "combined"'
            )

    hyperopt_executor = get_build_hyperopt_executor(executor[TYPE])(
        parameters, output_feature, metric, goal, split, search_alg=search_alg, **executor
    )

    # Explicitly default to a local backend to avoid picking up Ray
    # backend from the environment.
    backend = backend or config_dict.get("backend") or "local"
    backend = initialize_backend(backend)
    from ludwig.hyperopt.optuna_executor import OptunaExecutor

    if not (
        isinstance(backend, LocalBackend)
        or isinstance(hyperopt_executor, OptunaExecutor)
        or (isinstance(hyperopt_executor, RayTuneExecutor) and isinstance(backend, RayBackend))
    ):
        raise ValueError(
            "Hyperopt requires using a `local` backend at this time, or "
            "`ray` backend with `ray` executor, or `optuna` executor."
        )

    for callback in callbacks or []:
        callback.on_hyperopt_init(experiment_name)

    if not should_tune_preprocessing(full_config):
        # preprocessing is not being tuned, so generate it once before starting trials
        for callback in callbacks or []:
            callback.on_hyperopt_preprocessing_start(experiment_name)

        model = LudwigModel(
            config=full_config,
            backend=backend,
            gpus=gpus,
            gpu_memory_limit=gpu_memory_limit,
            allow_parallel_threads=allow_parallel_threads,
            callbacks=callbacks,
        )

        preprocessed = model.preprocess(
            dataset=dataset,
            training_set=training_set,
            validation_set=validation_set,
            test_set=test_set,
            training_set_metadata=training_set_metadata,
            data_format=data_format,
            skip_save_processed_input=skip_save_processed_input,
            random_seed=random_seed,
        )
        training_set = preprocessed.training_set
        validation_set = preprocessed.validation_set
        test_set = preprocessed.test_set
        training_set_metadata = preprocessed.training_set_metadata
        dataset = None

        dataset_statistics = generate_dataset_statistics(training_set, validation_set, test_set)

        logger.info("\nDataset Statistics")
        logger.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid"))

        for callback in callbacks or []:
            callback.on_hyperopt_preprocessing_end(experiment_name)

    for callback in callbacks or []:
        callback.on_hyperopt_start(experiment_name)

    hyperopt_results = hyperopt_executor.execute(
        premerged_config,
        dataset=dataset,
        training_set=training_set,
        validation_set=validation_set,
        test_set=test_set,
        training_set_metadata=training_set_metadata,
        data_format=data_format,
        experiment_name=experiment_name,
        model_name=model_name,
        resume=resume,
        skip_save_training_description=skip_save_training_description,
        skip_save_training_statistics=skip_save_training_statistics,
        skip_save_model=skip_save_model,
        skip_save_progress=skip_save_progress,
        skip_save_log=skip_save_log,
        skip_save_processed_input=skip_save_processed_input,
        skip_save_unprocessed_output=skip_save_unprocessed_output,
        skip_save_predictions=skip_save_predictions,
        skip_save_eval_stats=skip_save_eval_stats,
        output_directory=output_directory,
        gpus=gpus,
        gpu_memory_limit=gpu_memory_limit,
        allow_parallel_threads=allow_parallel_threads,
        callbacks=callbacks,
        tune_callbacks=tune_callbacks,
        backend=backend,
        random_seed=random_seed,
        hyperopt_log_verbosity=hyperopt_log_verbosity,
        **kwargs,
    )

    if backend.is_coordinator():
        print_hyperopt_results(hyperopt_results)

        if not skip_save_hyperopt_statistics:
            with backend.storage.artifacts.use_credentials():
                results_directory = os.path.join(output_directory, experiment_name)
                makedirs(results_directory, exist_ok=True)

                hyperopt_stats = {
                    "hyperopt_config": hyperopt_config,
                    "hyperopt_results": [t.to_dict() for t in hyperopt_results.ordered_trials],
                }

                save_hyperopt_stats(hyperopt_stats, results_directory)
                logger.info(f"Hyperopt stats saved to: {results_directory}")

    for callback in callbacks or []:
        callback.on_hyperopt_end(experiment_name)
        callback.on_hyperopt_finish(experiment_name)

    logger.info("Finished hyperopt")

    return hyperopt_results