LudwigModel
LudwigModel¶
ludwig.api.LudwigModel
¶
LudwigModel(config: str | dict, logging_level: int = logging.ERROR, backend: Backend | str | None = None, gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, callbacks: list[Callback] | None = None)
Class that allows access to high level Ludwig functionalities.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
(Union[str, dict]) in-memory representation of config or string path to a YAML config file.
TYPE:
|
logging_level
|
(int) Log level that will be sent to stderr.
TYPE:
|
backend
|
(Union[Backend, str])
TYPE:
|
gpus
|
(Union[str, int, List[int]], default:
TYPE:
|
gpu_memory_limit
|
(float: default:
TYPE:
|
allow_parallel_threads
|
(bool, default: Train a model: or If you have already trained a model you can load it and use it to predict Predict: or Evaluation: or
TYPE:
|
Constructor for the Ludwig Model class.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
(Union[str, dict]) in-memory representation of config or string path to a YAML config file.
TYPE:
|
logging_level
|
(int) Log level that will be sent to stderr.
TYPE:
|
backend
|
(Union[Backend, str])
TYPE:
|
gpus
|
(Union[str, int, List[int]], default:
TYPE:
|
gpu_memory_limit
|
(float: default:
TYPE:
|
allow_parallel_threads
|
(bool, default:
TYPE:
|
callbacks
|
(list, default:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
(None) |
Source code in ludwig/api.py
def __init__(
self,
config: str | dict,
logging_level: int = logging.ERROR,
backend: Backend | str | None = None,
gpus: str | int | list[int] | None = None,
gpu_memory_limit: float | None = None,
allow_parallel_threads: bool = True,
callbacks: list[Callback] | None = None,
) -> None:
"""Constructor for the Ludwig Model class.
# Inputs
:param config: (Union[str, dict]) in-memory representation of
config or string path to a YAML config file.
:param logging_level: (int) Log level that will be sent to stderr.
:param backend: (Union[Backend, str]) `Backend` or string name
of backend to use to execute preprocessing / training steps.
:param gpus: (Union[str, int, List[int]], default: `None`) GPUs
to use (it uses the same syntax of CUDA_VISIBLE_DEVICES)
:param gpu_memory_limit: (float: default: `None`) maximum memory fraction
[0, 1] allowed to allocate per GPU device.
:param allow_parallel_threads: (bool, default: `True`) allow Torch
to use multithreading parallelism to improve performance at the
cost of determinism.
:param callbacks: (list, default: `None`) a list of
`ludwig.callbacks.Callback` objects that provide hooks into the
Ludwig pipeline.
# Return
:return: (None) `None`
"""
# check if config is a path or a dict
if isinstance(config, str): # assume path
config_dict = load_yaml(config)
self.config_fp = config
else:
config_dict = copy.deepcopy(config)
self.config_fp = None # type: ignore [assignment]
self._user_config = upgrade_config_dict_to_latest_version(config_dict)
# Initialize the config object
self.config_obj = ModelConfig.from_dict(self._user_config)
# setup logging
self.set_logging_level(logging_level)
# setup Backend
self.backend = initialize_backend(backend or self._user_config.get("backend"))
self.callbacks = callbacks if callbacks is not None else []
# setup PyTorch env (GPU allocation, etc.)
self.backend.initialize_pytorch(
gpus=gpus, gpu_memory_limit=gpu_memory_limit, allow_parallel_threads=allow_parallel_threads
)
# setup model
self.model = None
self.training_set_metadata: dict[str, dict] | None = None
# online training state
self._online_trainer = None
# Zero-shot LLM usage.
if (
self.config_obj.model_type == MODEL_LLM
and self.config_obj.trainer.type == "none"
# Category output features require a vocabulary. The LLM LudwigModel should be initialized with
# model.train(dataset).
and self.config_obj.output_features[0].type == "text"
):
self._initialize_llm()
config
property
writable
¶
config: ModelConfigDict
Returns the fully-rendered config of this model including default values.
train
¶
train(dataset: str | dict | DataFrame | None = None, training_set: str | dict | DataFrame | Dataset | None = None, validation_set: str | dict | DataFrame | Dataset | None = None, test_set: str | dict | DataFrame | Dataset | None = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, experiment_name: str = 'api_experiment', model_name: str = 'run', model_resume_path: str | None = None, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = False, output_directory: str | None = 'results', random_seed: int = default_random_seed, **kwargs) -> TrainingResults
This function is used to perform a full training of the model on the specified dataset.
During training if the skip parameters are False
the model and statistics will be saved in a directory
[output_dir]/[experiment_name]_[model_name]_n where all variables are
resolved to user specified ones and n is an increasing number
starting from 0 used to differentiate among repeated runs.
| PARAMETER | DESCRIPTION |
|---|---|
dataset
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
validation_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
test_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set_metadata
|
(Union[str, dict], default:
TYPE:
|
data_format
|
(str, default:
TYPE:
|
experiment_name
|
(str, default:
TYPE:
|
model_name
|
(str, default:
TYPE:
|
model_resume_path
|
(str, default:
TYPE:
|
skip_save_training_description
|
(bool, default:
TYPE:
|
skip_save_training_statistics
|
(bool, default:
TYPE:
|
skip_save_model
|
(bool, default:
TYPE:
|
skip_save_progress
|
(bool, default:
TYPE:
|
skip_save_log
|
(bool, default:
TYPE:
|
skip_save_processed_input
|
(bool, default:
TYPE:
|
output_directory
|
(str, default:
TYPE:
|
random_seed
|
(int, default:
TYPE:
|
kwargs
|
(dict, default: {}) a dictionary of optional parameters.
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
TrainingResults
|
(Tuple[Dict, Union[Dict, pd.DataFrame], str]) tuple containing
|
Source code in ludwig/api.py
def train(
self,
dataset: str | dict | pd.DataFrame | None = None,
training_set: str | dict | pd.DataFrame | Dataset | None = None,
validation_set: str | dict | pd.DataFrame | Dataset | None = None,
test_set: str | dict | pd.DataFrame | Dataset | None = None,
training_set_metadata: str | dict | None = None,
data_format: str | None = None,
experiment_name: str = "api_experiment",
model_name: str = "run",
model_resume_path: str | None = None,
skip_save_training_description: bool = False,
skip_save_training_statistics: bool = False,
skip_save_model: bool = False,
skip_save_progress: bool = False,
skip_save_log: bool = False,
skip_save_processed_input: bool = False,
output_directory: str | None = "results",
random_seed: int = default_random_seed,
**kwargs,
) -> TrainingResults:
"""This function is used to perform a full training of the model on the specified dataset.
During training if the skip parameters are False
the model and statistics will be saved in a directory
`[output_dir]/[experiment_name]_[model_name]_n` where all variables are
resolved to user specified ones and `n` is an increasing number
starting from 0 used to differentiate among repeated runs.
# Inputs
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used in the experiment.
If it has a split column, it will be used for splitting
(0 for train, 1 for validation, 2 for test),
otherwise the dataset will be randomly split.
:param training_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing training data.
:param validation_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing validation data.
:param test_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing test data.
:param training_set_metadata: (Union[str, dict], default: `None`)
metadata JSON file or loaded metadata. Intermediate preprocessed
structure containing the mappings of the input dataset created the
first time an input file is used in the same directory with the
same name and a '.meta.json' extension.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`,
`'feather'`, `'fwf'`,
`'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`),
`'json'`, `'jsonl'`, `'parquet'`,
`'pickle'` (pickled Pandas DataFrame),
`'sas'`, `'spss'`, `'stata'`, `'tsv'`.
:param experiment_name: (str, default: `'experiment'`) name for
the experiment.
:param model_name: (str, default: `'run'`) name of the model that is
being used.
:param model_resume_path: (str, default: `None`) resumes training of
the model from the path specified. The config is restored.
In addition to config, training statistics, loss for each
epoch and the state of the optimizer are restored such that
training can be effectively continued from a previously interrupted
training process.
:param skip_save_training_description: (bool, default: `False`)
disables saving the description JSON file.
:param skip_save_training_statistics: (bool, default: `False`)
disables saving training statistics JSON file.
:param skip_save_model: (bool, default: `False`) disables
saving model weights and hyperparameters each time the model
improves. By default Ludwig saves model weights after each epoch
the validation metric improves, but if the model is really big
that can be time consuming. If you do not want to keep
the weights and just find out what performance a model can get
with a set of hyperparameters, use this parameter to skip it,
but the model will not be loadable later on and the returned model
will have the weights obtained at the end of training, instead of
the weights of the epoch with the best validation performance.
:param skip_save_progress: (bool, default: `False`) disables saving
progress each epoch. By default Ludwig saves weights and stats
after each epoch for enabling resuming of training, but if
the model is really big that can be time consuming and will uses
twice as much space, use this parameter to skip it, but training
cannot be resumed later on.
:param skip_save_log: (bool, default: `False`) disables saving
TensorBoard logs. By default Ludwig saves logs for the TensorBoard,
but if it is not needed turning it off can slightly increase the
overall speed.
:param skip_save_processed_input: (bool, default: `False`) if input
dataset is provided it is preprocessed and cached by saving an HDF5
and JSON files to avoid running the preprocessing again. If this
parameter is `False`, the HDF5 and JSON file are not saved.
:param output_directory: (str, default: `'results'`) the directory that
will contain the training statistics, TensorBoard logs, the saved
model and the training progress files.
:param random_seed: (int, default: `42`) a random seed that will be
used anywhere there is a call to a random number generator: data
splitting, parameter initialization and training set shuffling
:param kwargs: (dict, default: {}) a dictionary of optional parameters.
# Return
:return: (Tuple[Dict, Union[Dict, pd.DataFrame], str]) tuple containing
`(training_statistics, preprocessed_data, output_directory)`.
`training_statistics` is a nested dictionary of dataset -> feature_name -> metric_name -> List of metrics.
Each metric corresponds to each training checkpoint.
`preprocessed_data` is the tuple containing these three data sets
`(training_set, validation_set, test_set)`.
`output_directory` filepath to where training results are stored.
"""
# Only reset the metadata if the model has not been trained before
if self.training_set_metadata:
logger.warning(
"This model has been trained before. Its architecture has been defined by the original training set "
"(for example, the number of possible categorical outputs). The current training data will be mapped "
"to this architecture. If you want to change the architecture of the model, please concatenate your "
"new training data with the original and train a new model from scratch."
)
training_set_metadata = self.training_set_metadata
if self._user_config.get(HYPEROPT):
print_boxed("WARNING")
logger.warning(HYPEROPT_WARNING)
# setup directories and file names
if model_resume_path is not None:
if path_exists(model_resume_path):
output_directory = model_resume_path
if self.backend.is_coordinator():
logger.info(f"Model resume path '{model_resume_path}' exists, trying to resume training.")
else:
if self.backend.is_coordinator():
logger.info(
f"Model resume path '{model_resume_path}' does not exist, starting training from scratch"
)
model_resume_path = None
if model_resume_path is None:
if self.backend.is_coordinator():
output_directory = get_output_directory(output_directory, experiment_name, model_name)
else:
output_directory = None
# if we are skipping all saving,
# there is no need to create a directory that will remain empty
should_create_output_directory = not (
skip_save_training_description
and skip_save_training_statistics
and skip_save_model
and skip_save_progress
and skip_save_log
and skip_save_processed_input
)
output_url = output_directory
with upload_output_directory(output_directory) as (output_directory, upload_fn):
train_callbacks = self.callbacks
if upload_fn is not None:
# Upload output files (checkpoints, etc.) to remote storage at the end of
# each epoch and evaluation, in case of failure in the middle of training.
class UploadOnEpochEndCallback(Callback):
def on_eval_end(self, trainer, progress_tracker, save_path):
upload_fn()
def on_epoch_end(self, trainer, progress_tracker, save_path):
upload_fn()
train_callbacks = train_callbacks + [UploadOnEpochEndCallback()]
description_fn = training_stats_fn = model_dir = None
if self.backend.is_coordinator():
if should_create_output_directory:
makedirs(output_directory, exist_ok=True)
description_fn, training_stats_fn, model_dir = get_file_names(output_directory)
if isinstance(training_set, Dataset) and training_set_metadata is not None:
preprocessed_data = (training_set, validation_set, test_set, training_set_metadata)
else:
# save description
if self.backend.is_coordinator():
description = get_experiment_description(
self.config_obj.to_dict(),
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
test_set=test_set,
training_set_metadata=training_set_metadata,
data_format=data_format,
backend=self.backend,
random_seed=random_seed,
)
if not skip_save_training_description:
save_json(description_fn, description)
# print description
experiment_description = [
["Experiment name", experiment_name],
["Model name", model_name],
["Output directory", output_directory],
]
for key, value in description.items():
if key != "config": # Config is printed separately.
experiment_description.append([key, pformat(value, indent=4)])
if self.backend.is_coordinator():
print_boxed("EXPERIMENT DESCRIPTION")
logger.info(tabulate(experiment_description, tablefmt="fancy_grid"))
print_boxed("LUDWIG CONFIG")
logger.info("User-specified config (with upgrades):\n")
logger.info(pformat(self._user_config, indent=4))
logger.info(
"\nFull config saved to:\n"
f"{output_directory}/{experiment_name}/model/model_hyperparameters.json"
)
preprocessed_data = self.preprocess( # type: ignore[assignment]
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
test_set=test_set,
training_set_metadata=training_set_metadata,
data_format=data_format,
experiment_name=experiment_name,
model_name=model_name,
model_resume_path=model_resume_path,
skip_save_training_description=skip_save_training_description,
skip_save_training_statistics=skip_save_training_statistics,
skip_save_model=skip_save_model,
skip_save_progress=skip_save_progress,
skip_save_log=skip_save_log,
skip_save_processed_input=skip_save_processed_input,
output_directory=output_directory,
random_seed=random_seed,
**kwargs,
)
training_set, validation_set, test_set, training_set_metadata = preprocessed_data
self.training_set_metadata = training_set_metadata
if self.backend.is_coordinator():
dataset_statistics = generate_dataset_statistics(training_set, validation_set, test_set)
if not skip_save_model:
# save train set metadata
os.makedirs(model_dir, exist_ok=True) # type: ignore[arg-type]
save_json( # type: ignore[arg-type]
os.path.join(model_dir, TRAIN_SET_METADATA_FILE_NAME), training_set_metadata
)
logger.info("\nDataset Statistics")
logger.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid"))
for callback in self.callbacks:
callback.on_train_init(
base_config=self._user_config,
experiment_directory=output_directory,
experiment_name=experiment_name,
model_name=model_name,
output_directory=output_directory,
resume_directory=model_resume_path,
)
# Build model if not provided
# if it was provided it means it was already loaded
if not self.model:
if self.backend.is_coordinator():
print_boxed("MODEL")
# update model config with metadata properties derived from training set
update_config_with_metadata(self.config_obj, training_set_metadata)
logger.info("Warnings and other logs:")
self.model = LudwigModel.create_model(self.config_obj, random_seed=random_seed)
# update config with properties determined during model instantiation
update_config_with_model(self.config_obj, self.model)
set_saved_weights_in_checkpoint_flag(self.config_obj)
# auto tune learning rate
if hasattr(self.config_obj.trainer, "learning_rate") and self.config_obj.trainer.learning_rate == AUTO:
detected_learning_rate = get_auto_learning_rate(self.config_obj)
self.config_obj.trainer.learning_rate = detected_learning_rate
with self.backend.create_trainer(
model=self.model,
config=self.config_obj.trainer,
resume=model_resume_path is not None,
skip_save_model=skip_save_model,
skip_save_progress=skip_save_progress,
skip_save_log=skip_save_log,
callbacks=train_callbacks,
random_seed=random_seed,
) as trainer:
# auto tune batch size
self._tune_batch_size(trainer, training_set, random_seed=random_seed)
if (
self.config_obj.model_type == "LLM"
and trainer.config.type == "none"
and self.config_obj.adapter is not None
and self.config_obj.adapter.pretrained_adapter_weights is not None
):
trainer.model.initialize_adapter() # Load pre-trained adapter weights for inference only
# train model
if self.backend.is_coordinator():
print_boxed("TRAINING")
if not skip_save_model:
self.save_config(model_dir)
for callback in self.callbacks:
callback.on_train_start(
model=self.model,
config=self.config_obj.to_dict(),
config_fp=self.config_fp,
)
try:
train_stats = trainer.train(
training_set,
validation_set=validation_set,
test_set=test_set,
save_path=model_dir,
)
self.model, train_trainset_stats, train_valiset_stats, train_testset_stats = train_stats
# Calibrates output feature probabilities on validation set if calibration is enabled.
# Must be done after training, and before final model parameters are saved.
if self.backend.is_coordinator():
calibrator = Calibrator(
self.model,
self.backend,
batch_size=trainer.eval_batch_size,
)
if calibrator.calibration_enabled():
if validation_set is None:
logger.warning(
"Calibration uses validation set, but no validation split specified."
"Will use training set for calibration."
"Recommend providing a validation set when using calibration."
)
calibrator.train_calibration(training_set, TRAINING)
elif len(validation_set) < MIN_DATASET_SPLIT_ROWS:
logger.warning(
f"Validation set size ({len(validation_set)} rows) is too small for calibration."
"Will use training set for calibration."
f"Validation set much have at least {MIN_DATASET_SPLIT_ROWS} rows."
)
calibrator.train_calibration(training_set, TRAINING)
else:
calibrator.train_calibration(validation_set, VALIDATION)
if not skip_save_model:
self.model.save(model_dir)
# Evaluation Frequency
if self.config_obj.model_type == MODEL_ECD and self.config_obj.trainer.steps_per_checkpoint:
evaluation_frequency = EvaluationFrequency(
self.config_obj.trainer.steps_per_checkpoint, EvaluationFrequency.STEP
)
elif self.config_obj.model_type == MODEL_ECD and self.config_obj.trainer.checkpoints_per_epoch:
evaluation_frequency = EvaluationFrequency(
1.0 / self.config_obj.trainer.checkpoints_per_epoch, EvaluationFrequency.EPOCH
)
else:
evaluation_frequency = EvaluationFrequency(1, EvaluationFrequency.EPOCH)
# Unpack train()'s return.
# The statistics are all nested dictionaries of TrainerMetrics: feature_name -> metric_name ->
# List[TrainerMetric], with one entry per training checkpoint, according to steps_per_checkpoint.
# We reduce the dictionary of TrainerMetrics to a simple list of floats for interfacing with Ray
# Tune.
train_stats = TrainingStats(
metric_utils.reduce_trainer_metrics_dict(train_trainset_stats),
metric_utils.reduce_trainer_metrics_dict(train_valiset_stats),
metric_utils.reduce_trainer_metrics_dict(train_testset_stats),
evaluation_frequency,
)
# save training statistics
if self.backend.is_coordinator():
if not skip_save_training_statistics:
save_json(training_stats_fn, train_stats)
# results of the model with highest validation test performance
if (
self.backend.is_coordinator()
and validation_set is not None
and not self.config_obj.trainer.skip_all_evaluation
):
print_boxed("TRAINING REPORT")
training_report = get_training_report(
trainer.validation_field,
trainer.validation_metric,
test_set is not None,
train_valiset_stats,
train_testset_stats,
)
logger.info(tabulate(training_report, tablefmt="fancy_grid"))
logger.info(f"\nFinished: {experiment_name}_{model_name}")
logger.info(f"Saved to: {output_directory}")
finally:
for callback in self.callbacks:
callback.on_train_end(output_directory)
self.training_set_metadata = training_set_metadata
if self.is_merge_and_unload_set():
# For an LLM model trained with a LoRA adapter, merge first, then save the full model.
self.model.merge_and_unload(progressbar=self.config_obj.adapter.postprocessor.progressbar)
if self.backend.is_coordinator() and not skip_save_model:
self.model.save_base_model(model_dir)
elif self.backend.is_coordinator() and not skip_save_model:
self.model.save(model_dir)
# Synchronize model weights between workers
self.backend.sync_model(self.model)
print_boxed("FINISHED")
return TrainingResults(train_stats, preprocessed_data, output_url)
train_online
¶
train_online(dataset: str | dict | DataFrame, training_set_metadata: str | dict | None = None, data_format: str = 'auto', random_seed: int = default_random_seed) -> None
Performs one epoch of training of the model on dataset.
| PARAMETER | DESCRIPTION |
|---|---|
dataset
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set_metadata
|
(Union[str, dict], default:
TYPE:
|
data_format
|
(str, default:
TYPE:
|
random_seed
|
(int, default:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
(None) |
Source code in ludwig/api.py
def train_online(
self,
dataset: str | dict | pd.DataFrame,
training_set_metadata: str | dict | None = None,
data_format: str = "auto",
random_seed: int = default_random_seed,
) -> None:
"""Performs one epoch of training of the model on `dataset`.
# Inputs
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used in the experiment.
If it has a split column, it will be used for splitting (0 for train,
1 for validation, 2 for test), otherwise the dataset will be
randomly split.
:param training_set_metadata: (Union[str, dict], default: `None`)
metadata JSON file or loaded metadata. Intermediate preprocessed
structure containing the mappings of the input
dataset created the first time an input file is used in the same
directory with the same name and a '.meta.json' extension.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`, `'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`), `'json'`, `'jsonl'`,
`'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`, `'spss'`,
`'stata'`, `'tsv'`.
:param random_seed: (int, default: `42`) a random seed that is going to be
used anywhere there is a call to a random number generator: data
splitting, parameter initialization and training set shuffling
# Return
:return: (None) `None`
"""
training_set_metadata = training_set_metadata or self.training_set_metadata
preprocessing_params = get_preprocessing_params(self.config_obj)
with provision_preprocessing_workers(self.backend):
# TODO (Connor): Refactor to use self.config_obj
training_dataset, _, _, training_set_metadata = preprocess_for_training(
self.config_obj.to_dict(),
training_set=dataset,
training_set_metadata=training_set_metadata,
data_format=data_format,
skip_save_processed_input=True,
preprocessing_params=preprocessing_params,
backend=self.backend,
random_seed=random_seed,
callbacks=self.callbacks,
)
if not self.training_set_metadata:
self.training_set_metadata = training_set_metadata
if not self.model:
update_config_with_metadata(self.config_obj, training_set_metadata)
self.model = LudwigModel.create_model(self.config_obj, random_seed=random_seed)
# update config with properties determined during model instantiation
update_config_with_model(self.config_obj, self.model)
set_saved_weights_in_checkpoint_flag(self.config_obj)
if not self._online_trainer:
self._online_trainer = self.backend.create_trainer(
config=self.config_obj.trainer, model=self.model, random_seed=random_seed
)
self._tune_batch_size(self._online_trainer, dataset, random_seed=random_seed)
self.model = self._online_trainer.train_online(training_dataset)
save_dequantized_base_model
¶
save_dequantized_base_model(save_path: str) -> None
Upscales quantized weights of a model to fp16 and saves the result in a specified folder.
Args: save_path (str): The path to the folder where the upscaled model weights will be saved.
Raises: ValueError: If the model type is not 'llm' or if quantization is not enabled or the number of bits is not 4 or 8. RuntimeError: If no GPU is available, as GPU is required for quantized models.
Returns: None
Source code in ludwig/api.py
def save_dequantized_base_model(self, save_path: str) -> None:
"""Upscales quantized weights of a model to fp16 and saves the result in a specified folder.
Args:
save_path (str): The path to the folder where the upscaled model weights will be saved.
Raises:
ValueError:
If the model type is not 'llm' or if quantization is not enabled or the number of bits is not 4 or 8.
RuntimeError:
If no GPU is available, as GPU is required for quantized models.
Returns:
None
"""
if self.config_obj.model_type != MODEL_LLM:
raise ValueError(
f"Model type {self.config_obj.model_type} is not supported by this method. Only `llm` model type is "
"supported."
)
if not self.config_obj.quantization:
raise ValueError(
"Quantization is not enabled in your Ludwig model config. "
"To enable quantization, set `quantization` to `{'bits': 4}` or `{'bits': 8}` in your model config."
)
if self.config_obj.quantization.bits != 4:
raise ValueError(
"This method only works with quantized models with 4 bits. "
"Support for 8-bit quantized models will be added in a future release."
)
if not torch.cuda.is_available():
raise RuntimeError("GPU is required for quantized models but no GPU found.")
# Create the LLM model class instance with the loaded LLM if it hasn't been initialized yet.
if not self.model:
self.model = LudwigModel.create_model(self.config_obj)
self.model.save_dequantized_base_model(save_path)
logger.info(
"If you want to upload this model to huggingface.co, run the following Python commands: \n"
"from ludwig.utils.hf_utils import upload_folder_to_hfhub; \n"
f"upload_folder_to_hfhub(repo_id='desired/huggingface/repo/name', folder_path='{save_path}')"
)
generate
¶
generate(input_strings: str | list[str], generation_config: dict | None = None, streaming: bool | None = False) -> str | list[str]
A simple generate() method that directly uses the underlying transformers library to generate text.
Args: input_strings (Union[str, List[str]]): Input text or list of texts to generate from. generation_config (Optional[dict]): Configuration for text generation. streaming (Optional[bool]): If True, enable streaming output.
Returns: Union[str, List[str]]: Generated text or list of generated texts.
Source code in ludwig/api.py
def generate(
self,
input_strings: str | list[str],
generation_config: dict | None = None,
streaming: bool | None = False,
) -> str | list[str]:
"""A simple generate() method that directly uses the underlying transformers library to generate text.
Args:
input_strings (Union[str, List[str]]): Input text or list of texts to generate from.
generation_config (Optional[dict]): Configuration for text generation.
streaming (Optional[bool]): If True, enable streaming output.
Returns:
Union[str, List[str]]: Generated text or list of generated texts.
"""
if self.config_obj.model_type != MODEL_LLM:
raise ValueError(
f"Model type {self.config_obj.model_type} is not supported by this method. Only `llm` model type is "
"supported."
)
if not torch.cuda.is_available():
# GPU is generally well-advised for working with LLMs and is required for loading quantized models, see
# https://github.com/ludwig-ai/ludwig/issues/3695.
raise ValueError("GPU is not available.")
# TODO(Justin): Decide if it's worth folding padding_side handling into llm.py's tokenizer initialization.
# For batch inference with models like facebook/opt-350m, if the tokenizer padding side is off, HF prints a
# warning, e.g.:
# "A decoder-only architecture is being used, but right-padding was detected! For correct generation results, "
# "please set `padding_side='left'` when initializing the tokenizer.
padding_side = "left" if not self.model.model.config.is_encoder_decoder else "right"
tokenizer = HFTokenizer(self.config_obj.base_model, padding_side=padding_side)
with self.model.use_generation_config(generation_config):
start_time = time.time()
tokenized_inputs = tokenizer.tokenizer(input_strings, return_tensors="pt", padding=True)
input_ids = tokenized_inputs["input_ids"].to("cuda")
attention_mask = tokenized_inputs["attention_mask"].to("cuda")
if streaming:
streamer = create_text_streamer(tokenizer.tokenizer)
outputs = self._generate_streaming_outputs(input_strings, input_ids, attention_mask, streamer)
else:
outputs = self._generate_non_streaming_outputs(input_strings, input_ids, attention_mask)
decoded_outputs = tokenizer.tokenizer.batch_decode(outputs, skip_special_tokens=True)
logger.info(f"Finished generating in: {(time.time() - start_time):.2f}s.")
return decoded_outputs[0] if len(decoded_outputs) == 1 else decoded_outputs
predict
¶
predict(dataset: str | dict | DataFrame | None = None, data_format: str = None, split: str = FULL, batch_size: int = 128, generation_config: dict | None = None, skip_save_unprocessed_output: bool = True, skip_save_predictions: bool = True, output_directory: str = 'results', return_type: str | dict | DataFrame = pd.DataFrame, callbacks: list[Callback] | None = None, **kwargs) -> tuple[dict | pd.DataFrame, str]
Using a trained model, make predictions from the provided dataset.
| PARAMETER | DESCRIPTION |
|---|---|
dataset
|
(Union[str, dict, pandas.DataFrame]): source containing the entire dataset to be evaluated.
TYPE:
|
data_format
|
(str, default:
TYPE:
|
split
|
(str, default=
TYPE:
|
batch_size
|
(int, default: 128) size of batch to use when making predictions.
TYPE:
|
generation_config
|
(Dict, default:
TYPE:
|
skip_save_unprocessed_output
|
(bool, default:
TYPE:
|
skip_save_predictions
|
(bool, default:
TYPE:
|
output_directory
|
(str, default:
TYPE:
|
return_type
|
(Union[str, dict, pandas.DataFrame], default: pd.DataFrame) indicates the format of the returned predictions.
TYPE:
|
callbacks
|
(Optional[List[Callback]], default: None) optional list of callbacks to use during this predict operation. Any callbacks already registered to the model will be preserved.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[dict | DataFrame, str]
|
(Tuple[Union[dict, pd.DataFrame], str])
|
Source code in ludwig/api.py
def predict(
self,
dataset: str | dict | pd.DataFrame | None = None,
data_format: str = None,
split: str = FULL,
batch_size: int = 128,
generation_config: dict | None = None,
skip_save_unprocessed_output: bool = True,
skip_save_predictions: bool = True,
output_directory: str = "results",
return_type: str | dict | pd.DataFrame = pd.DataFrame,
callbacks: list[Callback] | None = None,
**kwargs,
) -> tuple[dict | pd.DataFrame, str]:
"""Using a trained model, make predictions from the provided dataset.
# Inputs
:param dataset: (Union[str, dict, pandas.DataFrame]): source containing the entire dataset to be evaluated.
:param data_format: (str, default: `None`) format to interpret data sources. Will be inferred automatically
if not specified. Valid formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`, `'hdf5'` (cache file produced during previous training), `'html'` (file containing a single
HTML `<table>`), `'json'`, `'jsonl'`, `'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`,
`'spss'`, `'stata'`, `'tsv'`.
:param split: (str, default= `'full'`): if the input dataset contains a split column, this parameter
indicates which split of the data to use. Possible values are `'full'`, `'training'`, `'validation'`,
`'test'`.
:param batch_size: (int, default: 128) size of batch to use when making predictions.
:param generation_config: (Dict, default: `None`) config for the generation of the
predictions. If `None`, the config that was used during model training is
used. This is only used if the model type is LLM. Otherwise, this parameter is
ignored. See
[Large Language Models](https://ludwig.ai/latest/configuration/large_language_model/#generation) under
"Generation" for an example generation config.
:param skip_save_unprocessed_output: (bool, default: `True`) if this parameter is `False`, predictions and
their probabilities are saved in both raw unprocessed numpy files containing tensors and as
postprocessed CSV files (one for each output feature). If this parameter is `True`, only the CSV ones
are saved and the numpy ones are skipped.
:param skip_save_predictions: (bool, default: `True`) skips saving test predictions CSV files.
:param output_directory: (str, default: `'results'`) the directory that will contain the training
statistics, TensorBoard logs, the saved model and the training progress files.
:param return_type: (Union[str, dict, pandas.DataFrame], default: pd.DataFrame) indicates the format of the
returned predictions.
:param callbacks: (Optional[List[Callback]], default: None) optional list of callbacks to use during this
predict operation. Any callbacks already registered to the model will be preserved.
# Return
:return `(predictions, output_directory)`: (Tuple[Union[dict, pd.DataFrame], str])
`predictions` predictions from the provided dataset,
`output_directory` filepath string to where data was stored.
"""
self._check_initialization()
# preprocessing
start_time = time.time()
logger.debug("Preprocessing")
dataset, _ = preprocess_for_prediction( # TODO (Connor): Refactor to use self.config_obj
self.config_obj.to_dict(),
dataset=dataset,
training_set_metadata=self.training_set_metadata,
data_format=data_format,
split=split,
include_outputs=False,
backend=self.backend,
callbacks=self.callbacks + (callbacks or []),
)
logger.debug("Predicting")
with self.backend.create_predictor(self.model, batch_size=batch_size) as predictor:
with self.model.use_generation_config(generation_config):
predictions = predictor.batch_predict(
dataset,
)
if self.backend.is_coordinator():
# if we are skipping all saving,
# there is no need to create a directory that will remain empty
should_create_exp_dir = not (skip_save_unprocessed_output and skip_save_predictions)
if should_create_exp_dir:
makedirs(output_directory, exist_ok=True)
logger.debug("Postprocessing")
postproc_predictions = postprocess(
predictions,
self.model.output_features,
self.training_set_metadata,
output_directory=output_directory,
backend=self.backend,
skip_save_unprocessed_output=skip_save_unprocessed_output or not self.backend.is_coordinator(),
)
converted_postproc_predictions = convert_predictions(
postproc_predictions, self.model.output_features, return_type=return_type, backend=self.backend
)
if self.backend.is_coordinator():
if not skip_save_predictions:
save_prediction_outputs(
postproc_predictions, self.model.output_features, output_directory, self.backend
)
logger.info(f"Saved to: {output_directory}")
logger.info(f"Finished predicting in: {(time.time() - start_time):.2f}s.")
return converted_postproc_predictions, output_directory
evaluate
¶
evaluate(dataset: str | dict | DataFrame | None = None, data_format: str | None = None, split: str = FULL, batch_size: int | None = None, skip_save_unprocessed_output: bool = True, skip_save_predictions: bool = True, skip_save_eval_stats: bool = True, collect_predictions: bool = False, collect_overall_stats: bool = False, output_directory: str = 'results', return_type: str | dict | DataFrame = pd.DataFrame, **kwargs) -> tuple[dict, dict | pd.DataFrame, str]
This function is used to predict the output variables given the input variables using the trained model and compute test statistics like performance measures, confusion matrices and the like.
| PARAMETER | DESCRIPTION |
|---|---|
dataset
|
(Union[str, dict, pandas.DataFrame]) source containing the entire dataset to be evaluated.
TYPE:
|
data_format
|
(str, default:
TYPE:
|
split
|
(str, default=
TYPE:
|
batch_size
|
(int, default: None) size of batch to use when making predictions. Defaults to model config eval_batch_size
TYPE:
|
skip_save_unprocessed_output
|
(bool, default:
TYPE:
|
skip_save_predictions
|
(bool, default:
TYPE:
|
skip_save_eval_stats
|
(bool, default:
TYPE:
|
collect_predictions
|
(bool, default:
TYPE:
|
collect_overall_stats
|
(bool, default: False) if
TYPE:
|
output_directory
|
(str, default:
TYPE:
|
return_type
|
(Union[str, dict, pd.DataFrame], default: pandas.DataFrame) indicates the format to of the returned predictions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[dict, dict | DataFrame, str]
|
( |
Source code in ludwig/api.py
def evaluate(
self,
dataset: str | dict | pd.DataFrame | None = None,
data_format: str | None = None,
split: str = FULL,
batch_size: int | None = None,
skip_save_unprocessed_output: bool = True,
skip_save_predictions: bool = True,
skip_save_eval_stats: bool = True,
collect_predictions: bool = False,
collect_overall_stats: bool = False,
output_directory: str = "results",
return_type: str | dict | pd.DataFrame = pd.DataFrame,
**kwargs,
) -> tuple[dict, dict | pd.DataFrame, str]:
"""This function is used to predict the output variables given the input variables using the trained model
and compute test statistics like performance measures, confusion matrices and the like.
# Inputs
:param dataset: (Union[str, dict, pandas.DataFrame]) source containing
the entire dataset to be evaluated.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`, `'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`), `'json'`, `'jsonl'`,
`'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`, `'spss'`,
`'stata'`, `'tsv'`.
:param split: (str, default=`'full'`): if the input dataset contains
a split column, this parameter indicates which split of the data
to use. Possible values are `'full'`, `'training'`, `'validation'`, `'test'`.
:param batch_size: (int, default: None) size of batch to use when making
predictions. Defaults to model config eval_batch_size
:param skip_save_unprocessed_output: (bool, default: `True`) if this
parameter is `False`, predictions and their probabilities are saved
in both raw unprocessed numpy files containing tensors and as
postprocessed CSV files (one for each output feature).
If this parameter is `True`, only the CSV ones are saved and the
numpy ones are skipped.
:param skip_save_predictions: (bool, default: `True`) skips saving
test predictions CSV files.
:param skip_save_eval_stats: (bool, default: `True`) skips saving
test statistics JSON file.
:param collect_predictions: (bool, default: `False`) if `True`
collects post-processed predictions during eval.
:param collect_overall_stats: (bool, default: False) if `True`
collects overall stats during eval.
:param output_directory: (str, default: `'results'`) the directory that
will contain the training statistics, TensorBoard logs, the saved
model and the training progress files.
:param return_type: (Union[str, dict, pd.DataFrame], default: pandas.DataFrame) indicates
the format to of the returned predictions.
# Return
:return: (`evaluation_statistics`, `predictions`, `output_directory`)
`evaluation_statistics` dictionary containing evaluation performance
statistics,
`postprocess_predictions` contains predicted values,
`output_directory` is location where results are stored.
"""
self._check_initialization()
for callback in self.callbacks:
callback.on_evaluation_start()
# preprocessing
logger.debug("Preprocessing")
dataset, training_set_metadata = preprocess_for_prediction( # TODO (Connor): Refactor to use self.config_obj
self.config_obj.to_dict(),
dataset=dataset,
training_set_metadata=self.training_set_metadata,
data_format=data_format,
split=split,
include_outputs=True,
backend=self.backend,
callbacks=self.callbacks,
)
# Fallback to use eval_batch_size or batch_size if not provided
if batch_size is None:
# Requires dictionary getter since some trainer configs may not have a batch_size param
batch_size = self.config_obj.trainer.to_dict().get(
EVAL_BATCH_SIZE, None
) or self.config_obj.trainer.to_dict().get(BATCH_SIZE, None)
logger.debug("Predicting")
with self.backend.create_predictor(self.model, batch_size=batch_size) as predictor:
eval_stats, predictions = predictor.batch_evaluation(
dataset,
collect_predictions=collect_predictions or collect_overall_stats,
)
# calculate the overall metrics
if collect_overall_stats:
dataset = dataset.to_df()
overall_stats = calculate_overall_stats(
self.model.output_features, predictions, dataset, training_set_metadata
)
eval_stats = {
of_name: (
{**eval_stats[of_name], **overall_stats[of_name]}
# account for presence of 'combined' key
if of_name in overall_stats
else {**eval_stats[of_name]}
)
for of_name in eval_stats
}
if self.backend.is_coordinator():
# if we are skipping all saving,
# there is no need to create a directory that will remain empty
should_create_exp_dir = not (
skip_save_unprocessed_output and skip_save_predictions and skip_save_eval_stats
)
if should_create_exp_dir:
makedirs(output_directory, exist_ok=True)
if collect_predictions:
logger.debug("Postprocessing")
postproc_predictions = postprocess(
predictions,
self.model.output_features,
self.training_set_metadata,
output_directory=output_directory,
backend=self.backend,
skip_save_unprocessed_output=skip_save_unprocessed_output or not self.backend.is_coordinator(),
)
else:
postproc_predictions = predictions # = {}
if self.backend.is_coordinator():
should_save_predictions = (
collect_predictions and postproc_predictions is not None and not skip_save_predictions
)
if should_save_predictions:
save_prediction_outputs(
postproc_predictions, self.model.output_features, output_directory, self.backend
)
print_evaluation_stats(eval_stats)
if not skip_save_eval_stats:
save_evaluation_stats(eval_stats, output_directory)
if should_save_predictions or not skip_save_eval_stats:
logger.info(f"Saved to: {output_directory}")
if collect_predictions:
postproc_predictions = convert_predictions(
postproc_predictions, self.model.output_features, return_type=return_type, backend=self.backend
)
for callback in self.callbacks:
callback.on_evaluation_end()
return eval_stats, postproc_predictions, output_directory
experiment
¶
experiment(dataset: str | dict | DataFrame | None = None, training_set: str | dict | DataFrame | None = None, validation_set: str | dict | DataFrame | None = None, test_set: str | dict | DataFrame | None = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, experiment_name: str = 'experiment', model_name: str = 'run', model_resume_path: str | None = None, eval_split: str = TEST, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = False, skip_save_unprocessed_output: bool = False, skip_save_predictions: bool = False, skip_save_eval_stats: bool = False, skip_collect_predictions: bool = False, skip_collect_overall_stats: bool = False, output_directory: str = 'results', random_seed: int = default_random_seed, **kwargs) -> tuple[dict | None, TrainingStats, PreprocessedDataset, str]
Trains a model on a dataset's training and validation splits and uses it to predict on the test split. It saves the trained model and the statistics of training and testing.
| PARAMETER | DESCRIPTION |
|---|---|
dataset
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
validation_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
test_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set_metadata
|
(Union[str, dict], default:
TYPE:
|
data_format
|
(str, default:
TYPE:
|
experiment_name
|
(str, default:
TYPE:
|
model_name
|
(str, default:
TYPE:
|
model_resume_path
|
(str, default:
TYPE:
|
eval_split
|
(str, default:
TYPE:
|
skip_save_training_description
|
(bool, default:
TYPE:
|
skip_save_training_statistics
|
(bool, default:
TYPE:
|
skip_save_model
|
(bool, default:
TYPE:
|
skip_save_progress
|
(bool, default:
TYPE:
|
skip_save_log
|
(bool, default:
TYPE:
|
skip_save_processed_input
|
(bool, default:
TYPE:
|
skip_save_unprocessed_output
|
(bool, default:
TYPE:
|
skip_save_predictions
|
(bool, default:
TYPE:
|
skip_save_eval_stats
|
(bool, default:
TYPE:
|
skip_collect_predictions
|
(bool, default:
TYPE:
|
skip_collect_overall_stats
|
(bool, default:
TYPE:
|
output_directory
|
(str, default:
TYPE:
|
random_seed
|
(int: default: 42) random seed used for weights initialization, splits and any other random function.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[dict | None, TrainingStats, PreprocessedDataset, str]
|
(Tuple[dict, dict, tuple, str))
|
Source code in ludwig/api.py
def experiment(
self,
dataset: str | dict | pd.DataFrame | None = None,
training_set: str | dict | pd.DataFrame | None = None,
validation_set: str | dict | pd.DataFrame | None = None,
test_set: str | dict | pd.DataFrame | None = None,
training_set_metadata: str | dict | None = None,
data_format: str | None = None,
experiment_name: str = "experiment",
model_name: str = "run",
model_resume_path: str | None = None,
eval_split: str = TEST,
skip_save_training_description: bool = False,
skip_save_training_statistics: bool = False,
skip_save_model: bool = False,
skip_save_progress: bool = False,
skip_save_log: bool = False,
skip_save_processed_input: bool = False,
skip_save_unprocessed_output: bool = False,
skip_save_predictions: bool = False,
skip_save_eval_stats: bool = False,
skip_collect_predictions: bool = False,
skip_collect_overall_stats: bool = False,
output_directory: str = "results",
random_seed: int = default_random_seed,
**kwargs,
) -> tuple[dict | None, TrainingStats, PreprocessedDataset, str]:
"""Trains a model on a dataset's training and validation splits and uses it to predict on the test split.
It saves the trained model and the statistics of training and testing.
# Inputs
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used in the experiment.
If it has a split column, it will be used for splitting (0 for train,
1 for validation, 2 for test), otherwise the dataset will be
randomly split.
:param training_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing training data.
:param validation_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing validation data.
:param test_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing test data.
:param training_set_metadata: (Union[str, dict], default: `None`)
metadata JSON file or loaded metadata. Intermediate preprocessed
structure containing the mappings of the input
dataset created the first time an input file is used in the same
directory with the same name and a '.meta.json' extension.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`, `'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`), `'json'`, `'jsonl'`,
`'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`, `'spss'`,
`'stata'`, `'tsv'`.
:param experiment_name: (str, default: `'experiment'`) name for
the experiment.
:param model_name: (str, default: `'run'`) name of the model that is
being used.
:param model_resume_path: (str, default: `None`) resumes training of
the model from the path specified. The config is restored.
In addition to config, training statistics and loss for
epoch and the state of the optimizer are restored such that
training can be effectively continued from a previously interrupted
training process.
:param eval_split: (str, default: `test`) split on which
to perform evaluation. Valid values are `training`, `validation`
and `test`.
:param skip_save_training_description: (bool, default: `False`) disables
saving the description JSON file.
:param skip_save_training_statistics: (bool, default: `False`) disables
saving training statistics JSON file.
:param skip_save_model: (bool, default: `False`) disables
saving model weights and hyperparameters each time the model
improves. By default Ludwig saves model weights after each epoch
the validation metric improves, but if the model is really big
that can be time consuming. If you do not want to keep
the weights and just find out what performance a model can get
with a set of hyperparameters, use this parameter to skip it,
but the model will not be loadable later on and the returned model
will have the weights obtained at the end of training, instead of
the weights of the epoch with the best validation performance.
:param skip_save_progress: (bool, default: `False`) disables saving
progress each epoch. By default Ludwig saves weights and stats
after each epoch for enabling resuming of training, but if
the model is really big that can be time consuming and will uses
twice as much space, use this parameter to skip it, but training
cannot be resumed later on.
:param skip_save_log: (bool, default: `False`) disables saving
TensorBoard logs. By default Ludwig saves logs for the TensorBoard,
but if it is not needed turning it off can slightly increase the
overall speed.
:param skip_save_processed_input: (bool, default: `False`) if input
dataset is provided it is preprocessed and cached by saving an HDF5
and JSON files to avoid running the preprocessing again. If this
parameter is `False`, the HDF5 and JSON file are not saved.
:param skip_save_unprocessed_output: (bool, default: `False`) by default
predictions and their probabilities are saved in both raw
unprocessed numpy files containing tensors and as postprocessed
CSV files (one for each output feature). If this parameter is True,
only the CSV ones are saved and the numpy ones are skipped.
:param skip_save_predictions: (bool, default: `False`) skips saving test
predictions CSV files
:param skip_save_eval_stats: (bool, default: `False`) skips saving test
statistics JSON file
:param skip_collect_predictions: (bool, default: `False`) skips
collecting post-processed predictions during eval.
:param skip_collect_overall_stats: (bool, default: `False`) skips
collecting overall stats during eval.
:param output_directory: (str, default: `'results'`) the directory that
will contain the training statistics, TensorBoard logs, the saved
model and the training progress files.
:param random_seed: (int: default: 42) random seed used for weights
initialization, splits and any other random function.
# Return
:return: (Tuple[dict, dict, tuple, str))
`(evaluation_statistics, training_statistics, preprocessed_data, output_directory)`
`evaluation_statistics` dictionary with evaluation performance
statistics on the test_set,
`training_statistics` is a nested dictionary of dataset -> feature_name -> metric_name -> List of metrics.
Each metric corresponds to each training checkpoint.
`preprocessed_data` tuple containing preprocessed
`(training_set, validation_set, test_set)`, `output_directory`
filepath string to where results are stored.
"""
if self._user_config.get(HYPEROPT):
print_boxed("WARNING")
logger.warning(HYPEROPT_WARNING)
train_stats, preprocessed_data, output_directory = self.train(
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
test_set=test_set,
training_set_metadata=training_set_metadata,
data_format=data_format,
experiment_name=experiment_name,
model_name=model_name,
model_resume_path=model_resume_path,
skip_save_training_description=skip_save_training_description,
skip_save_training_statistics=skip_save_training_statistics,
skip_save_model=skip_save_model,
skip_save_progress=skip_save_progress,
skip_save_log=skip_save_log,
skip_save_processed_input=skip_save_processed_input,
skip_save_unprocessed_output=skip_save_unprocessed_output,
output_directory=output_directory,
random_seed=random_seed,
)
training_set, validation_set, test_set, training_set_metadata = preprocessed_data
eval_set = validation_set
if eval_split == TRAINING:
eval_set = training_set
elif eval_split == VALIDATION:
eval_set = validation_set
elif eval_split == TEST:
eval_set = test_set
else:
logger.warning(f"Eval split {eval_split} not supported. " f"Using validation set instead")
if eval_set is not None:
trainer_dict = self.config_obj.trainer.to_dict()
batch_size = trainer_dict.get(EVAL_BATCH_SIZE, trainer_dict.get(BATCH_SIZE, None))
# predict
try:
eval_stats, _, _ = self.evaluate(
eval_set,
data_format=data_format,
batch_size=batch_size,
output_directory=output_directory,
skip_save_unprocessed_output=skip_save_unprocessed_output,
skip_save_predictions=skip_save_predictions,
skip_save_eval_stats=skip_save_eval_stats,
collect_predictions=not skip_collect_predictions,
collect_overall_stats=not skip_collect_overall_stats,
return_type="dict",
)
except NotImplementedError:
logger.warning(
"Skipping evaluation as the necessary methods are not "
"supported. Full exception below:\n"
f"{traceback.format_exc()}"
)
eval_stats = None
else:
logger.warning(f"The evaluation set {eval_set} was not provided. " f"Skipping evaluation")
eval_stats = None
return eval_stats, train_stats, preprocessed_data, output_directory
collect_weights
¶
collect_weights(tensor_names: list[str] = None, **kwargs) -> list
Load a pre-trained model and collect the tensors with a specific name.
| PARAMETER | DESCRIPTION |
|---|---|
tensor_names
|
(list, default:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list
|
(list) List of tensors |
Source code in ludwig/api.py
def collect_weights(self, tensor_names: list[str] = None, **kwargs) -> list:
"""Load a pre-trained model and collect the tensors with a specific name.
# Inputs
:param tensor_names: (list, default: `None`) List of tensor names to collect
weights
# Return
:return: (list) List of tensors
"""
self._check_initialization()
collected_tensors = self.model.collect_weights(tensor_names)
return collected_tensors
collect_activations
¶
collect_activations(layer_names: list[str], dataset: str | dict[str, list] | DataFrame, data_format: str | None = None, split: str = FULL, batch_size: int = 128, **kwargs) -> list
Loads a pre-trained model model and input data to collect the values of the activations contained in the tensors.
| PARAMETER | DESCRIPTION |
|---|---|
layer_names
|
(list) list of strings for layer names in the model to collect activations.
TYPE:
|
dataset
|
(Union[str, Dict[str, list], pandas.DataFrame]) source containing the data to make predictions.
TYPE:
|
data_format
|
(str, default:
TYPE:
|
split
|
(str, default=
TYPE:
|
batch_size
|
(int, default: 128) size of batch to use when making predictions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list
|
(list) list of collected tensors. |
Source code in ludwig/api.py
def collect_activations(
self,
layer_names: list[str],
dataset: str | dict[str, list] | pd.DataFrame,
data_format: str | None = None,
split: str = FULL,
batch_size: int = 128,
**kwargs,
) -> list:
"""Loads a pre-trained model model and input data to collect the values of the activations contained in the
tensors.
# Inputs
:param layer_names: (list) list of strings for layer names in the model
to collect activations.
:param dataset: (Union[str, Dict[str, list], pandas.DataFrame]) source
containing the data to make predictions.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`, `'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`), `'json'`, `'jsonl'`,
`'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`, `'spss'`,
`'stata'`, `'tsv'`.
:param split: (str, default= `'full'`): if the input dataset contains
a split column, this parameter indicates which split of the data
to use. Possible values are `'full'`, `'training'`, `'validation'`, `'test'`.
:param batch_size: (int, default: 128) size of batch to use when making
predictions.
# Return
:return: (list) list of collected tensors.
"""
self._check_initialization()
# preprocessing
logger.debug("Preprocessing")
dataset, training_set_metadata = preprocess_for_prediction( # TODO (Connor): Refactor to use self.config_obj
self.config_obj.to_dict(),
dataset=dataset,
training_set_metadata=self.training_set_metadata,
data_format=data_format,
split=split,
include_outputs=False,
)
logger.debug("Predicting")
with self.backend.create_predictor(self.model, batch_size=batch_size) as predictor:
activations = predictor.batch_collect_activations(
layer_names,
dataset,
)
return activations
preprocess
¶
preprocess(dataset: str | dict | DataFrame | None = None, training_set: str | dict | DataFrame | None = None, validation_set: str | dict | DataFrame | None = None, test_set: str | dict | DataFrame | None = None, training_set_metadata: str | dict | None = None, data_format: str | None = None, skip_save_processed_input: bool = True, random_seed: int = default_random_seed, **kwargs) -> PreprocessedDataset
This function is used to preprocess data.
Args:¶
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used in the experiment.
If it has a split column, it will be used for splitting
(0 for train, 1 for validation, 2 for test),
otherwise the dataset will be randomly split.
:param training_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing training data.
:param validation_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing validation data.
:param test_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing test data.
:param training_set_metadata: (Union[str, dict], default: `None`)
metadata JSON file or loaded metadata. Intermediate preprocessed
structure containing the mappings of the input
dataset created the first time an input file is used in the same
directory with the same name and a '.meta.json' extension.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`,
`'feather'`, `'fwf'`,
`'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`),
`'json'`, `'jsonl'`, `'parquet'`,
`'pickle'` (pickled Pandas DataFrame),
`'sas'`, `'spss'`, `'stata'`, `'tsv'`.
:param skip_save_processed_input: (bool, default: `False`) if input
dataset is provided it is preprocessed and cached by saving an HDF5
and JSON files to avoid running the preprocessing again. If this
parameter is `False`, the HDF5 and JSON file are not saved.
:param random_seed: (int, default: `42`) a random seed that will be
used anywhere there is a call to a random number generator: data
splitting, parameter initialization and training set shuffling
Returns:¶
:return: (PreprocessedDataset) data structure containing
`(proc_training_set, proc_validation_set, proc_test_set, training_set_metadata)`.
Raises:¶
RuntimeError: An error occurred while preprocessing the data. Examples include training dataset
being empty after preprocessing, lazy loading not being supported with RayBackend, etc.
Source code in ludwig/api.py
def preprocess(
self,
dataset: str | dict | pd.DataFrame | None = None,
training_set: str | dict | pd.DataFrame | None = None,
validation_set: str | dict | pd.DataFrame | None = None,
test_set: str | dict | pd.DataFrame | None = None,
training_set_metadata: str | dict | None = None,
data_format: str | None = None,
skip_save_processed_input: bool = True,
random_seed: int = default_random_seed,
**kwargs,
) -> PreprocessedDataset:
"""This function is used to preprocess data.
# Args:
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used in the experiment.
If it has a split column, it will be used for splitting
(0 for train, 1 for validation, 2 for test),
otherwise the dataset will be randomly split.
:param training_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing training data.
:param validation_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing validation data.
:param test_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing test data.
:param training_set_metadata: (Union[str, dict], default: `None`)
metadata JSON file or loaded metadata. Intermediate preprocessed
structure containing the mappings of the input
dataset created the first time an input file is used in the same
directory with the same name and a '.meta.json' extension.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`,
`'feather'`, `'fwf'`,
`'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`),
`'json'`, `'jsonl'`, `'parquet'`,
`'pickle'` (pickled Pandas DataFrame),
`'sas'`, `'spss'`, `'stata'`, `'tsv'`.
:param skip_save_processed_input: (bool, default: `False`) if input
dataset is provided it is preprocessed and cached by saving an HDF5
and JSON files to avoid running the preprocessing again. If this
parameter is `False`, the HDF5 and JSON file are not saved.
:param random_seed: (int, default: `42`) a random seed that will be
used anywhere there is a call to a random number generator: data
splitting, parameter initialization and training set shuffling
# Returns:
:return: (PreprocessedDataset) data structure containing
`(proc_training_set, proc_validation_set, proc_test_set, training_set_metadata)`.
# Raises:
RuntimeError: An error occurred while preprocessing the data. Examples include training dataset
being empty after preprocessing, lazy loading not being supported with RayBackend, etc.
"""
print_boxed("PREPROCESSING")
for callback in self.callbacks:
callback.on_preprocess_start(self.config_obj.to_dict())
preprocessing_params = get_preprocessing_params(self.config_obj)
proc_training_set = proc_validation_set = proc_test_set = None
try:
with provision_preprocessing_workers(self.backend):
# TODO (Connor): Refactor to use self.config_obj
preprocessed_data = preprocess_for_training(
self.config_obj.to_dict(),
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
test_set=test_set,
training_set_metadata=training_set_metadata,
data_format=data_format,
skip_save_processed_input=skip_save_processed_input,
preprocessing_params=preprocessing_params,
backend=self.backend,
random_seed=random_seed,
callbacks=self.callbacks,
)
proc_training_set, proc_validation_set, proc_test_set, training_set_metadata = preprocessed_data
return PreprocessedDataset(proc_training_set, proc_validation_set, proc_test_set, training_set_metadata)
except Exception as e:
raise RuntimeError(f"Caught exception during model preprocessing: {str(e)}") from e
finally:
for callback in self.callbacks:
callback.on_preprocess_end(proc_training_set, proc_validation_set, proc_test_set, training_set_metadata)
load
staticmethod
¶
load(model_dir: str, logging_level: int = logging.ERROR, backend: Backend | str | None = None, gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, callbacks: list[Callback] = None, from_checkpoint: bool = False) -> LudwigModel
This function allows for loading pretrained models.
| PARAMETER | DESCRIPTION |
|---|---|
model_dir
|
(str) path to the directory containing the model.
If the model was trained by the
TYPE:
|
logging_level
|
(int, default: 40) log level that will be sent to stderr.
TYPE:
|
backend
|
(Union[Backend, str])
TYPE:
|
gpus
|
(Union[str, int, List[int]], default:
TYPE:
|
gpu_memory_limit
|
(float: default:
TYPE:
|
allow_parallel_threads
|
(bool, default:
TYPE:
|
callbacks
|
(list, default:
TYPE:
|
from_checkpoint
|
(bool, default:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
LudwigModel
|
(LudwigModel) a LudwigModel object Example usage |
Source code in ludwig/api.py
@staticmethod
def load(
model_dir: str,
logging_level: int = logging.ERROR,
backend: Backend | str | None = None,
gpus: str | int | list[int] | None = None,
gpu_memory_limit: float | None = None,
allow_parallel_threads: bool = True,
callbacks: list[Callback] = None,
from_checkpoint: bool = False,
) -> "LudwigModel": # return is an instance of ludwig.api.LudwigModel class
"""This function allows for loading pretrained models.
# Inputs
:param model_dir: (str) path to the directory containing the model.
If the model was trained by the `train` or `experiment` command,
the model is in `results_dir/experiment_dir/model`.
:param logging_level: (int, default: 40) log level that will be sent to
stderr.
:param backend: (Union[Backend, str]) `Backend` or string name
of backend to use to execute preprocessing / training steps.
:param gpus: (Union[str, int, List[int]], default: `None`) GPUs
to use (it uses the same syntax of CUDA_VISIBLE_DEVICES)
:param gpu_memory_limit: (float: default: `None`) maximum memory fraction
[0, 1] allowed to allocate per GPU device.
:param allow_parallel_threads: (bool, default: `True`) allow Torch
to use
multithreading parallelism to improve performance at the cost of
determinism.
:param callbacks: (list, default: `None`) a list of
`ludwig.callbacks.Callback` objects that provide hooks into the
Ludwig pipeline.
:param from_checkpoint: (bool, default: `False`) if `True`, the model
will be loaded from the latest checkpoint (training_checkpoints/)
instead of the final model weights.
# Return
:return: (LudwigModel) a LudwigModel object
# Example usage
```python
ludwig_model = LudwigModel.load(model_dir)
```
"""
# Initialize PyTorch before calling `broadcast()` to prevent initializing
# Torch with default parameters
backend_param = backend
backend = initialize_backend(backend)
backend.initialize_pytorch(
gpus=gpus, gpu_memory_limit=gpu_memory_limit, allow_parallel_threads=allow_parallel_threads
)
config = backend.broadcast_return(lambda: load_json(os.path.join(model_dir, MODEL_HYPERPARAMETERS_FILE_NAME)))
# Upgrades deprecated fields and adds new required fields in case the config loaded from disk is old.
config_obj = ModelConfig.from_dict(config)
# Ensure that the original backend is used if it was specified in the config and user requests it
if backend_param is None and "backend" in config:
# Reset backend from config
backend = initialize_backend(config.get("backend"))
# initialize model
ludwig_model = LudwigModel(
config_obj.to_dict(),
logging_level=logging_level,
backend=backend,
gpus=gpus,
gpu_memory_limit=gpu_memory_limit,
allow_parallel_threads=allow_parallel_threads,
callbacks=callbacks,
)
# generate model from config
set_saved_weights_in_checkpoint_flag(config_obj)
ludwig_model.model = LudwigModel.create_model(config_obj)
# load model weights
ludwig_model.load_weights(model_dir, from_checkpoint)
# If merge_and_unload was NOT performed before saving (i.e., adapter weights exist),
# we need to merge them now for inference.
if ludwig_model.is_merge_and_unload_set():
weights_save_path = os.path.join(model_dir, MODEL_WEIGHTS_FILE_NAME)
adapter_config_path = os.path.join(weights_save_path, "adapter_config.json")
if os.path.exists(adapter_config_path):
ludwig_model.model.merge_and_unload(progressbar=config_obj.adapter.postprocessor.progressbar)
# load train set metadata
ludwig_model.training_set_metadata = backend.broadcast_return(
lambda: load_metadata(os.path.join(model_dir, TRAIN_SET_METADATA_FILE_NAME))
)
return ludwig_model
load_weights
¶
load_weights(model_dir: str, from_checkpoint: bool = False) -> None
Loads weights from a pre-trained model.
| PARAMETER | DESCRIPTION |
|---|---|
model_dir
|
(str) filepath string to location of a pre-trained model
TYPE:
|
from_checkpoint
|
(bool, default:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
|
Source code in ludwig/api.py
def load_weights(
self,
model_dir: str,
from_checkpoint: bool = False,
) -> None:
"""Loads weights from a pre-trained model.
# Inputs
:param model_dir: (str) filepath string to location of a pre-trained
model
:param from_checkpoint: (bool, default: `False`) if `True`, the model
will be loaded from the latest checkpoint (training_checkpoints/)
instead of the final model weights.
# Return
:return: `None`
# Example usage
```python
ludwig_model.load_weights(model_dir)
```
"""
if self.backend.is_coordinator():
if from_checkpoint:
with self.backend.create_trainer(
model=self.model,
config=self.config_obj.trainer,
) as trainer:
checkpoint = trainer.create_checkpoint_handle()
training_checkpoints_path = os.path.join(model_dir, TRAINING_CHECKPOINTS_DIR_PATH)
trainer.resume_weights_and_optimizer(training_checkpoints_path, checkpoint)
else:
self.model.load(model_dir)
self.backend.sync_model(self.model)
save
¶
save(save_path: str) -> None
This function allows to save models on disk.
| PARAMETER | DESCRIPTION |
|---|---|
save_path
|
(str) path to the directory where the model is going to be saved. Both a JSON file containing the model architecture hyperparameters and checkpoints files containing model weights will be saved.
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
(None) |
Source code in ludwig/api.py
def save(self, save_path: str) -> None:
"""This function allows to save models on disk.
# Inputs
:param save_path: (str) path to the directory where the model is
going to be saved. Both a JSON file containing the model
architecture hyperparameters and checkpoints files containing
model weights will be saved.
# Return
:return: (None) `None`
# Example usage
```python
ludwig_model.save(save_path)
```
"""
self._check_initialization()
# save config
self.save_config(save_path)
# save model weights
self.model.save(save_path)
# save training set metadata
training_set_metadata_path = os.path.join(save_path, TRAIN_SET_METADATA_FILE_NAME)
save_json(training_set_metadata_path, self.training_set_metadata)
upload_to_hf_hub
staticmethod
¶
upload_to_hf_hub(repo_id: str, model_path: str, repo_type: str = 'model', private: bool = False, commit_message: str = 'Upload trained [Ludwig](https://ludwig.ai/latest/) model weights', commit_description: str | None = None) -> bool
Uploads trained model artifacts to the HuggingFace Hub.
| PARAMETER | DESCRIPTION |
|---|---|
repo_id
|
(
TYPE:
|
model_path
|
(
TYPE:
|
private
|
(
TYPE:
|
repo_type
|
(
TYPE:
|
commit_message
|
(
TYPE:
|
commit_description
|
(
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
(bool) True for success, False for failure. |
Source code in ludwig/api.py
@staticmethod
def upload_to_hf_hub(
repo_id: str,
model_path: str,
repo_type: str = "model",
private: bool = False,
commit_message: str = "Upload trained [Ludwig](https://ludwig.ai/latest/) model weights",
commit_description: str | None = None,
) -> bool:
"""Uploads trained model artifacts to the HuggingFace Hub.
# Inputs
:param repo_id: (`str`)
A namespace (user or an organization) and a repo name separated
by a `/`.
:param model_path: (`str`)
The path of the saved model. This is either (a) the folder where
the 'model_weights' folder and the 'model_hyperparameters.json' file
are stored, or (b) the parent of that folder.
:param private: (`bool`, *optional*, defaults to `False`)
Whether the model repo should be private.
:param repo_type: (`str`, *optional*)
Set to `"dataset"` or `"space"` if uploading to a dataset or
space, `None` or `"model"` if uploading to a model. Default is
`None`.
:param commit_message: (`str`, *optional*)
The summary / title / first line of the generated commit. Defaults to:
`f"Upload {path_in_repo} with huggingface_hub"`
:param commit_description: (`str` *optional*)
The description of the generated commit
# Returns
:return: (bool) True for success, False for failure.
"""
if os.path.exists(os.path.join(model_path, MODEL_FILE_NAME, MODEL_WEIGHTS_FILE_NAME)) and os.path.exists(
os.path.join(model_path, MODEL_FILE_NAME, MODEL_HYPERPARAMETERS_FILE_NAME)
):
experiment_path = model_path
elif os.path.exists(os.path.join(model_path, MODEL_WEIGHTS_FILE_NAME)) and os.path.exists(
os.path.join(model_path, MODEL_HYPERPARAMETERS_FILE_NAME)
):
experiment_path = os.path.dirname(model_path)
else:
raise ValueError(
f"Can't find 'model_weights' and '{MODEL_HYPERPARAMETERS_FILE_NAME}' either at "
f"'{model_path}' or at '{model_path}/model'"
)
model_service = get_upload_registry()["hf_hub"]
hub: HuggingFaceHub = model_service()
hub.login()
upload_status: bool = hub.upload(
repo_id=repo_id,
model_path=experiment_path,
repo_type=repo_type,
private=private,
commit_message=commit_message,
commit_description=commit_description,
)
return upload_status
save_config
¶
save_config(save_path: str) -> None
Save config to specified location.
| PARAMETER | DESCRIPTION |
|---|---|
save_path
|
(str) filepath string to save config as a JSON file.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
|
Source code in ludwig/api.py
def save_config(self, save_path: str) -> None:
"""Save config to specified location.
# Inputs
:param save_path: (str) filepath string to save config as a
JSON file.
# Return
:return: `None`
"""
os.makedirs(save_path, exist_ok=True)
model_hyperparameters_path = os.path.join(save_path, MODEL_HYPERPARAMETERS_FILE_NAME)
save_json(model_hyperparameters_path, self.config_obj.to_dict())
to_torchscript
¶
to_torchscript(model_only: bool = False, device: TorchDevice | None = None)
Converts the trained model to Torchscript.
| PARAMETER | DESCRIPTION |
|---|---|
optional)
|
If True, only the ECD model will be converted to Torchscript. Else, preprocessing and postprocessing steps will also be converted to Torchscript. :param device (TorchDevice, optional): If None, the model will be converted to Torchscript on the same device to ensure maximum model parity.
|
| RETURNS | DESCRIPTION |
|---|---|
|
A torch.jit.ScriptModule that can be used to predict on a dictionary of inputs. |
Source code in ludwig/api.py
def to_torchscript(
self,
model_only: bool = False,
device: TorchDevice | None = None,
):
"""Converts the trained model to Torchscript.
# Inputs
:param model_only (bool, optional): If True, only the ECD model will be converted to Torchscript. Else,
preprocessing and postprocessing steps will also be converted to Torchscript. :param device (TorchDevice,
optional): If None, the model will be converted to Torchscript on the same device to ensure maximum model
parity.
# Returns
:return: A torch.jit.ScriptModule that can be used to predict on a dictionary of inputs.
"""
if device is None:
device = DEVICE
self._check_initialization()
if model_only:
return self.model.to_torchscript(device)
else:
inference_module = InferenceModule.from_ludwig_model(
self.model, self.config_obj.to_dict(), self.training_set_metadata, device=device
)
return torch.jit.script(inference_module)
save_torchscript
¶
save_torchscript(save_path: str, model_only: bool = False, device: TorchDevice | None = None)
Saves the Torchscript model to disk.
| PARAMETER | DESCRIPTION |
|---|---|
(str)
|
The path to the directory where the model will be saved.
TYPE:
|
optional)
|
If True, only the ECD model will be converted to Torchscript. Else, the preprocessing and postprocessing steps will also be converted to Torchscript.
|
| RETURNS | DESCRIPTION |
|---|---|
|
|
Source code in ludwig/api.py
def save_torchscript(
self,
save_path: str,
model_only: bool = False,
device: TorchDevice | None = None,
):
"""Saves the Torchscript model to disk.
# Inputs
:param save_path (str): The path to the directory where the model will be saved.
:param model_only (bool, optional): If True, only the ECD model will be converted to Torchscript. Else, the
preprocessing and postprocessing steps will also be converted to Torchscript.
:param device (TorchDevice, optional): If None, the model will be converted to Torchscript on the same device to
ensure maximum model parity.
# Return
:return: `None`
"""
if device is None:
device = DEVICE
save_ludwig_model_for_inference(
save_path,
self.model,
self.config_obj.to_dict(),
self.training_set_metadata,
model_only=model_only,
device=device,
)
free_gpu_memory
¶
free_gpu_memory()
Manually moves the model to CPU to force GPU memory to be freed.
For more context: https://discuss.pytorch.org/t/how-can-we-release-gpu-memory-cache/14530/35
Source code in ludwig/api.py
def free_gpu_memory(self):
"""Manually moves the model to CPU to force GPU memory to be freed.
For more context: https://discuss.pytorch.org/t/how-can-we-release-gpu-memory-cache/14530/35
"""
if torch.cuda.is_available():
self.model.model.to(torch.device("cpu"))
torch.cuda.empty_cache()
create_model
staticmethod
¶
create_model(config_obj: ModelConfig | dict, random_seed: int = default_random_seed) -> BaseModel
Instantiates BaseModel object.
| PARAMETER | DESCRIPTION |
|---|---|
config_obj
|
(Union[Config, dict]) Ludwig config object
TYPE:
|
random_seed
|
(int, default: ludwig default random seed) Random seed used for weights initialization, splits and any other random function. # Return
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseModel
|
(ludwig.models.BaseModel) Instance of the Ludwig model object. |
Source code in ludwig/api.py
@staticmethod
def create_model(config_obj: ModelConfig | dict, random_seed: int = default_random_seed) -> BaseModel:
"""Instantiates BaseModel object.
# Inputs
:param config_obj: (Union[Config, dict]) Ludwig config object
:param random_seed: (int, default: ludwig default random seed) Random seed used for weights initialization,
splits and any other random function. # Return
:return: (ludwig.models.BaseModel) Instance of the Ludwig model object.
"""
if isinstance(config_obj, dict):
config_obj = ModelConfig.from_dict(config_obj)
model_type = get_from_registry(config_obj.model_type, model_type_registry)
return model_type(config_obj, random_seed=random_seed)
set_logging_level
staticmethod
¶
set_logging_level(logging_level: int) -> None
Sets level for log messages.
| PARAMETER | DESCRIPTION |
|---|---|
logging_level
|
(int) Set/Update the logging level. Use logging
constants like
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
|
Source code in ludwig/api.py
@staticmethod
def set_logging_level(logging_level: int) -> None:
"""Sets level for log messages.
# Inputs
:param logging_level: (int) Set/Update the logging level. Use logging
constants like `logging.DEBUG` , `logging.INFO` and `logging.ERROR`.
# Return
:return: `None`
"""
logging.getLogger("ludwig").setLevel(logging_level)
if logging_level in {logging.WARNING, logging.ERROR, logging.CRITICAL}:
set_disable_progressbar(True)
else:
set_disable_progressbar(False)
is_merge_and_unload_set
¶
is_merge_and_unload_set() -> bool
Check whether the encapsulated model is of type LLM and is configured to merge_and_unload QLoRA weights.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
whether merge_and_unload should be done. |
Source code in ludwig/api.py
def is_merge_and_unload_set(self) -> bool:
"""Check whether the encapsulated model is of type LLM and is configured to merge_and_unload QLoRA weights.
# Return
:return (bool): whether merge_and_unload should be done.
"""
# TODO: In the future, it may be possible to move up the model type check into the BaseModel class.
return self.config_obj.model_type == MODEL_LLM and self.model.is_merge_and_unload_set()
ludwig.api.kfold_cross_validate
¶
kfold_cross_validate(num_folds: int, config: dict | str, dataset: str = None, data_format: str = None, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = False, skip_save_predictions: bool = False, skip_save_eval_stats: bool = False, skip_collect_predictions: bool = False, skip_collect_overall_stats: bool = False, output_directory: str = 'results', random_seed: int = default_random_seed, gpus: str | int | list[int] | None = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, backend: Backend | str | None = None, logging_level: int = logging.INFO, **kwargs) -> tuple[dict, dict]
Performs k-fold cross validation and returns result data structures.
| PARAMETER | DESCRIPTION |
|---|---|
num_folds
|
(int) number of folds to create for the cross-validation
TYPE:
|
config
|
(Union[dict, str]) model specification required to build a model. Parameter may be a dictionary or string specifying the file path to a yaml configuration file. Refer to the User Guide for details.
TYPE:
|
dataset
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
skip_save_training_description
|
(bool, default:
TYPE:
|
skip_save_training_statistics
|
(bool, default:
TYPE:
|
skip_save_model
|
(bool, default:
TYPE:
|
skip_save_progress
|
(bool, default:
TYPE:
|
skip_save_log
|
(bool, default:
TYPE:
|
skip_save_processed_input
|
(bool, default:
TYPE:
|
skip_save_predictions
|
(bool, default:
TYPE:
|
skip_save_eval_stats
|
(bool, default:
TYPE:
|
skip_collect_predictions
|
(bool, default:
TYPE:
|
skip_collect_overall_stats
|
(bool, default:
TYPE:
|
output_directory
|
(str, default:
TYPE:
|
random_seed
|
(int, default:
TYPE:
|
gpus
|
(list, default:
TYPE:
|
gpu_memory_limit
|
(float: default:
TYPE:
|
allow_parallel_threads
|
(bool, default:
TYPE:
|
backend
|
(Union[Backend, str])
TYPE:
|
logging_level
|
(int, default: INFO) log level to send to stderr.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[dict, dict]
|
(tuple(kfold_cv_statistics, kfold_split_indices), dict) a tuple of
dictionaries |
Source code in ludwig/api.py
@PublicAPI
def kfold_cross_validate(
num_folds: int,
config: dict | str,
dataset: str = None,
data_format: str = None,
skip_save_training_description: bool = False,
skip_save_training_statistics: bool = False,
skip_save_model: bool = False,
skip_save_progress: bool = False,
skip_save_log: bool = False,
skip_save_processed_input: bool = False,
skip_save_predictions: bool = False,
skip_save_eval_stats: bool = False,
skip_collect_predictions: bool = False,
skip_collect_overall_stats: bool = False,
output_directory: str = "results",
random_seed: int = default_random_seed,
gpus: str | int | list[int] | None = None,
gpu_memory_limit: float | None = None,
allow_parallel_threads: bool = True,
backend: Backend | str | None = None,
logging_level: int = logging.INFO,
**kwargs,
) -> tuple[dict, dict]:
"""Performs k-fold cross validation and returns result data structures.
# Inputs
:param num_folds: (int) number of folds to create for the cross-validation
:param config: (Union[dict, str]) model specification
required to build a model. Parameter may be a dictionary or string
specifying the file path to a yaml configuration file. Refer to the
[User Guide](http://ludwig.ai/user_guide/#model-config)
for details.
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used for k_fold processing.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`,
`'html'` (file containing a single HTML `<table>`), `'json'`, `'jsonl'`,
`'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`, `'spss'`,
`'stata'`, `'tsv'`. Currently `hdf5` format is not supported for
k_fold cross validation.
:param skip_save_training_description: (bool, default: `False`) disables
saving the description JSON file.
:param skip_save_training_statistics: (bool, default: `False`) disables
saving training statistics JSON file.
:param skip_save_model: (bool, default: `False`) disables
saving model weights and hyperparameters each time the model
improves. By default Ludwig saves model weights after each epoch
the validation metric improves, but if the model is really big
that can be time consuming. If you do not want to keep
the weights and just find out what performance a model can get
with a set of hyperparameters, use this parameter to skip it,
but the model will not be loadable later on and the returned model
will have the weights obtained at the end of training, instead of
the weights of the epoch with the best validation performance.
:param skip_save_progress: (bool, default: `False`) disables saving
progress each epoch. By default Ludwig saves weights and stats
after each epoch for enabling resuming of training, but if
the model is really big that can be time consuming and will uses
twice as much space, use this parameter to skip it, but training
cannot be resumed later on.
:param skip_save_log: (bool, default: `False`) disables saving TensorBoard
logs. By default Ludwig saves logs for the TensorBoard, but if it
is not needed turning it off can slightly increase the
overall speed.
:param skip_save_processed_input: (bool, default: `False`) if input
dataset is provided it is preprocessed and cached by saving an HDF5
and JSON files to avoid running the preprocessing again. If this
parameter is `False`, the HDF5 and JSON file are not saved.
:param skip_save_predictions: (bool, default: `False`) skips saving test
predictions CSV files.
:param skip_save_eval_stats: (bool, default: `False`) skips saving test
statistics JSON file.
:param skip_collect_predictions: (bool, default: `False`) skips collecting
post-processed predictions during eval.
:param skip_collect_overall_stats: (bool, default: `False`) skips collecting
overall stats during eval.
:param output_directory: (str, default: `'results'`) the directory that
will contain the training statistics, TensorBoard logs, the saved
model and the training progress files.
:param random_seed: (int, default: `42`) Random seed
used for weights initialization,
splits and any other random function.
:param gpus: (list, default: `None`) list of GPUs that are available
for training.
:param gpu_memory_limit: (float: default: `None`) maximum memory fraction
[0, 1] allowed to allocate per GPU device.
:param allow_parallel_threads: (bool, default: `True`) allow Torch to
use multithreading parallelism
to improve performance at the cost of determinism.
:param backend: (Union[Backend, str]) `Backend` or string name
of backend to use to execute preprocessing / training steps.
:param logging_level: (int, default: INFO) log level to send to stderr.
# Return
:return: (tuple(kfold_cv_statistics, kfold_split_indices), dict) a tuple of
dictionaries `kfold_cv_statistics`: contains metrics from cv run.
`kfold_split_indices`: indices to split training data into
training fold and test fold.
"""
# if config is a path, convert to dictionary
if isinstance(config, str): # assume path
config = load_yaml(config)
backend = initialize_backend(backend or config.get("backend"))
# check for k_fold
if num_folds is None:
raise ValueError("k_fold parameter must be specified")
logger.info(f"starting {num_folds:d}-fold cross validation")
# create output_directory if not available
if not os.path.isdir(output_directory):
os.mkdir(output_directory)
# prepare data for k-fold processing
# use Ludwig's utility to facilitate creating a dataframe
# that is used as the basis for creating folds
dataset, _, _, _ = load_dataset_uris(dataset, None, None, None, backend)
# determine data format of provided dataset
if not data_format or data_format == "auto":
data_format = figure_data_format(dataset)
data_df = load_dataset(dataset, data_format=data_format, df_lib=backend.df_engine.df_lib)
kfold_cv_stats = {}
kfold_split_indices = {}
for train_indices, test_indices, fold_num in generate_kfold_splits(data_df, num_folds, random_seed):
with tempfile.TemporaryDirectory() as temp_dir_name:
curr_train_df = data_df.iloc[train_indices]
curr_test_df = data_df.iloc[test_indices]
kfold_split_indices["fold_" + str(fold_num)] = {
"training_indices": train_indices,
"test_indices": test_indices,
}
# train and validate model on this fold
logger.info(f"training on fold {fold_num:d}")
model = LudwigModel(
config=config,
logging_level=logging_level,
backend=backend,
gpus=gpus,
gpu_memory_limit=gpu_memory_limit,
allow_parallel_threads=allow_parallel_threads,
)
eval_stats, train_stats, preprocessed_data, output_directory = model.experiment(
training_set=curr_train_df,
test_set=curr_test_df,
experiment_name="cross_validation",
model_name="fold_" + str(fold_num),
skip_save_training_description=skip_save_training_description,
skip_save_training_statistics=skip_save_training_statistics,
skip_save_model=skip_save_model,
skip_save_progress=skip_save_progress,
skip_save_log=skip_save_log,
skip_save_processed_input=skip_save_processed_input,
skip_save_predictions=skip_save_predictions,
skip_save_eval_stats=skip_save_eval_stats,
skip_collect_predictions=skip_collect_predictions,
skip_collect_overall_stats=skip_collect_overall_stats,
output_directory=os.path.join(temp_dir_name, "results"),
random_seed=random_seed,
)
# augment the training statistics with scoring metric from
# the hold out fold
if dataclasses.is_dataclass(train_stats):
train_stats_dict = dataclasses.asdict(train_stats)
elif hasattr(train_stats, "to_dict"):
train_stats_dict = train_stats.to_dict()
else:
train_stats_dict = vars(train_stats)
train_stats_dict["fold_eval_stats"] = eval_stats
# collect training statistics for this fold
kfold_cv_stats["fold_" + str(fold_num)] = train_stats_dict
# consolidate raw fold metrics across all folds
raw_kfold_stats = {}
for fold_name in kfold_cv_stats:
curr_fold_eval_stats = kfold_cv_stats[fold_name]["fold_eval_stats"]
for of_name in curr_fold_eval_stats:
if of_name not in raw_kfold_stats:
raw_kfold_stats[of_name] = {}
fold_eval_stats_of = curr_fold_eval_stats[of_name]
for metric in fold_eval_stats_of:
if metric not in {
"predictions",
"probabilities",
"confusion_matrix",
"overall_stats",
"per_class_stats",
"roc_curve",
"precision_recall_curve",
}:
if metric not in raw_kfold_stats[of_name]:
raw_kfold_stats[of_name][metric] = []
raw_kfold_stats[of_name][metric].append(fold_eval_stats_of[metric])
# calculate overall kfold statistics
overall_kfold_stats = {}
for of_name in raw_kfold_stats:
overall_kfold_stats[of_name] = {}
for metric in raw_kfold_stats[of_name]:
mean = np.mean(raw_kfold_stats[of_name][metric])
std = np.std(raw_kfold_stats[of_name][metric])
overall_kfold_stats[of_name][metric + "_mean"] = mean
overall_kfold_stats[of_name][metric + "_std"] = std
kfold_cv_stats["overall"] = overall_kfold_stats
logger.info(f"completed {num_folds:d}-fold cross validation")
return kfold_cv_stats, kfold_split_indices
ludwig.hyperopt.run.hyperopt
¶
hyperopt(config: str | dict, dataset: str | dict | DataFrame = None, training_set: str | dict | DataFrame = None, validation_set: str | dict | DataFrame = None, test_set: str | dict | DataFrame = None, training_set_metadata: str | dict = None, data_format: str = None, experiment_name: str = 'hyperopt', model_name: str = 'run', resume: bool | None = None, skip_save_training_description: bool = False, skip_save_training_statistics: bool = False, skip_save_model: bool = False, skip_save_progress: bool = False, skip_save_log: bool = False, skip_save_processed_input: bool = True, skip_save_unprocessed_output: bool = False, skip_save_predictions: bool = False, skip_save_eval_stats: bool = False, skip_save_hyperopt_statistics: bool = False, output_directory: str = 'results', gpus: str | int | list[int] = None, gpu_memory_limit: float | None = None, allow_parallel_threads: bool = True, callbacks: list[Callback] = None, tune_callbacks: list[Callback] = None, backend: Backend | str = None, random_seed: int = default_random_seed, hyperopt_log_verbosity: int = 3, **kwargs) -> HyperoptResults
This method performs an hyperparameter optimization.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
(Union[str, dict]) config which defines
the different parameters of the model, features, preprocessing and
training. If
TYPE:
|
dataset
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
validation_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
test_set
|
(Union[str, dict, pandas.DataFrame], default:
TYPE:
|
training_set_metadata
|
(Union[str, dict], default:
TYPE:
|
data_format
|
(str, default:
TYPE:
|
experiment_name
|
(str, default:
TYPE:
|
model_name
|
(str, default:
TYPE:
|
resume
|
(bool) If true, continue hyperopt from the state of the previous run in the output directory with the same experiment name. If false, will create new trials, ignoring any previous state, even if they exist in the output_directory. By default, will attempt to resume if there is already an existing experiment with the same name, and will create new trials if not.
TYPE:
|
skip_save_training_description
|
(bool, default:
TYPE:
|
skip_save_training_statistics
|
(bool, default:
TYPE:
|
skip_save_model
|
(bool, default:
TYPE:
|
skip_save_progress
|
(bool, default:
TYPE:
|
skip_save_log
|
(bool, default:
TYPE:
|
skip_save_processed_input
|
(bool, default:
TYPE:
|
skip_save_unprocessed_output
|
(bool, default:
TYPE:
|
skip_save_predictions
|
(bool, default:
TYPE:
|
skip_save_eval_stats
|
(bool, default:
TYPE:
|
skip_save_hyperopt_statistics
|
(bool, default:
TYPE:
|
output_directory
|
(str, default:
TYPE:
|
gpus
|
(list, default:
TYPE:
|
gpu_memory_limit
|
(float: default:
TYPE:
|
allow_parallel_threads
|
(bool, default:
TYPE:
|
callbacks
|
(list, default:
TYPE:
|
backend
|
(Union[Backend, str])
TYPE:
|
random_seed
|
(int: default: 42) random seed used for weights initialization, splits and any other random function.
TYPE:
|
hyperopt_log_verbosity
|
(int: default: 3) controls verbosity of ray tune log messages. Valid values: 0 = silent, 1 = only status updates, 2 = status and brief trial results, 3 = status and detailed trial results.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
HyperoptResults
|
(List[dict]) List of results for each trial, ordered by descending performance on the target metric. |
Source code in ludwig/hyperopt/run.py
def hyperopt(
config: str | dict,
dataset: str | dict | pd.DataFrame = None,
training_set: str | dict | pd.DataFrame = None,
validation_set: str | dict | pd.DataFrame = None,
test_set: str | dict | pd.DataFrame = None,
training_set_metadata: str | dict = None,
data_format: str = None,
experiment_name: str = "hyperopt",
model_name: str = "run",
resume: bool | None = None,
skip_save_training_description: bool = False,
skip_save_training_statistics: bool = False,
skip_save_model: bool = False,
skip_save_progress: bool = False,
skip_save_log: bool = False,
skip_save_processed_input: bool = True,
skip_save_unprocessed_output: bool = False,
skip_save_predictions: bool = False,
skip_save_eval_stats: bool = False,
skip_save_hyperopt_statistics: bool = False,
output_directory: str = "results",
gpus: str | int | list[int] = None,
gpu_memory_limit: float | None = None,
allow_parallel_threads: bool = True,
callbacks: list[Callback] = None,
tune_callbacks: list[TuneCallback] = None,
backend: Backend | str = None,
random_seed: int = default_random_seed,
hyperopt_log_verbosity: int = 3,
**kwargs,
) -> HyperoptResults:
"""This method performs an hyperparameter optimization.
# Inputs
:param config: (Union[str, dict]) config which defines
the different parameters of the model, features, preprocessing and
training. If `str`, filepath to yaml configuration file.
:param dataset: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing the entire dataset to be used in the experiment.
If it has a split column, it will be used for splitting (0 for train,
1 for validation, 2 for test), otherwise the dataset will be
randomly split.
:param training_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing training data.
:param validation_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing validation data.
:param test_set: (Union[str, dict, pandas.DataFrame], default: `None`)
source containing test data.
:param training_set_metadata: (Union[str, dict], default: `None`)
metadata JSON file or loaded metadata. Intermediate preprocessed
structure containing the mappings of the input
dataset created the first time an input file is used in the same
directory with the same name and a '.meta.json' extension.
:param data_format: (str, default: `None`) format to interpret data
sources. Will be inferred automatically if not specified. Valid
formats are `'auto'`, `'csv'`, `'df'`, `'dict'`, `'excel'`, `'feather'`,
`'fwf'`, `'hdf5'` (cache file produced during previous training),
`'html'` (file containing a single HTML `<table>`), `'json'`, `'jsonl'`,
`'parquet'`, `'pickle'` (pickled Pandas DataFrame), `'sas'`, `'spss'`,
`'stata'`, `'tsv'`.
:param experiment_name: (str, default: `'experiment'`) name for
the experiment.
:param model_name: (str, default: `'run'`) name of the model that is
being used.
:param resume: (bool) If true, continue hyperopt from the state of the previous
run in the output directory with the same experiment name. If false, will create
new trials, ignoring any previous state, even if they exist in the output_directory.
By default, will attempt to resume if there is already an existing experiment with
the same name, and will create new trials if not.
:param skip_save_training_description: (bool, default: `False`) disables
saving the description JSON file.
:param skip_save_training_statistics: (bool, default: `False`) disables
saving training statistics JSON file.
:param skip_save_model: (bool, default: `False`) disables
saving model weights and hyperparameters each time the model
improves. By default Ludwig saves model weights after each epoch
the validation metric improves, but if the model is really big
that can be time consuming. If you do not want to keep
the weights and just find out what performance a model can get
with a set of hyperparameters, use this parameter to skip it,
but the model will not be loadable later on and the returned model
will have the weights obtained at the end of training, instead of
the weights of the epoch with the best validation performance.
:param skip_save_progress: (bool, default: `False`) disables saving
progress each epoch. By default Ludwig saves weights and stats
after each epoch for enabling resuming of training, but if
the model is really big that can be time consuming and will uses
twice as much space, use this parameter to skip it, but training
cannot be resumed later on.
:param skip_save_log: (bool, default: `False`) disables saving
TensorBoard logs. By default Ludwig saves logs for the TensorBoard,
but if it is not needed turning it off can slightly increase the
overall speed.
:param skip_save_processed_input: (bool, default: `False`) if input
dataset is provided it is preprocessed and cached by saving an HDF5
and JSON files to avoid running the preprocessing again. If this
parameter is `False`, the HDF5 and JSON file are not saved.
:param skip_save_unprocessed_output: (bool, default: `False`) by default
predictions and their probabilities are saved in both raw
unprocessed numpy files containing tensors and as postprocessed
CSV files (one for each output feature). If this parameter is True,
only the CSV ones are saved and the numpy ones are skipped.
:param skip_save_predictions: (bool, default: `False`) skips saving test
predictions CSV files.
:param skip_save_eval_stats: (bool, default: `False`) skips saving test
statistics JSON file.
:param skip_save_hyperopt_statistics: (bool, default: `False`) skips saving
hyperopt stats file.
:param output_directory: (str, default: `'results'`) the directory that
will contain the training statistics, TensorBoard logs, the saved
model and the training progress files.
:param gpus: (list, default: `None`) list of GPUs that are available
for training.
:param gpu_memory_limit: (float: default: `None`) maximum memory fraction
[0, 1] allowed to allocate per GPU device.
:param allow_parallel_threads: (bool, default: `True`) allow PyTorch
to use multithreading parallelism to improve performance at
the cost of determinism.
:param callbacks: (list, default: `None`) a list of
`ludwig.callbacks.Callback` objects that provide hooks into the
Ludwig pipeline.
:param backend: (Union[Backend, str]) `Backend` or string name
of backend to use to execute preprocessing / training steps.
:param random_seed: (int: default: 42) random seed used for weights
initialization, splits and any other random function.
:param hyperopt_log_verbosity: (int: default: 3) controls verbosity of
ray tune log messages. Valid values: 0 = silent, 1 = only status updates,
2 = status and brief trial results, 3 = status and detailed trial results.
# Return
:return: (List[dict]) List of results for each trial, ordered by
descending performance on the target metric.
"""
from ludwig.hyperopt.execution import get_build_hyperopt_executor, RayTuneExecutor
# check if config is a path or a dict
if isinstance(config, str): # assume path
with open_file(config, "r") as def_file:
config_dict = yaml.safe_load(def_file)
else:
config_dict = config
if HYPEROPT not in config_dict:
raise ValueError("Hyperopt Section not present in config")
# backwards compatibility
upgraded_config = upgrade_config_dict_to_latest_version(config_dict)
# Initialize config object
config_obj = ModelConfig.from_dict(upgraded_config)
# Retain pre-merged config for hyperopt schema generation
premerged_config = copy.deepcopy(upgraded_config)
# Get full config with defaults
full_config = config_obj.to_dict() # TODO (Connor): Refactor to use config object
hyperopt_config = full_config[HYPEROPT]
# Explicitly default to a local backend to avoid picking up Ray
# backend from the environment.
backend = backend or config_dict.get("backend") or "local"
backend = initialize_backend(backend)
update_hyperopt_params_with_defaults(hyperopt_config)
# Check if all features are grid type parameters and log UserWarning if needed
log_warning_if_all_grid_type_parameters(hyperopt_config)
# Infer max concurrent trials
if hyperopt_config[EXECUTOR].get(MAX_CONCURRENT_TRIALS) == AUTO:
hyperopt_config[EXECUTOR][MAX_CONCURRENT_TRIALS] = backend.max_concurrent_trials(hyperopt_config)
logger.info(f"Setting max_concurrent_trials to {hyperopt_config[EXECUTOR][MAX_CONCURRENT_TRIALS]}")
# Print hyperopt config
logger.info("Hyperopt Config")
logger.info(pformat(hyperopt_config, indent=4))
logger.info("\n")
search_alg = hyperopt_config[SEARCH_ALG]
executor = hyperopt_config[EXECUTOR]
parameters = hyperopt_config[PARAMETERS]
split = hyperopt_config[SPLIT]
output_feature = hyperopt_config["output_feature"]
metric = hyperopt_config[METRIC]
goal = hyperopt_config[GOAL]
######################
# check validity of output_feature / metric/ split combination
######################
splitter = get_splitter(**full_config[PREPROCESSING]["split"])
if split == TRAINING:
if training_set is None and not splitter.has_split(0):
raise ValueError(
'The data for the specified split for hyperopt "{}" '
"was not provided, "
"or the split amount specified in the preprocessing section "
"of the config is not greater than 0".format(split)
)
elif split == VALIDATION:
if validation_set is None and not splitter.has_split(1):
raise ValueError(
'The data for the specified split for hyperopt "{}" '
"was not provided, "
"or the split amount specified in the preprocessing section "
"of the config is not greater than 0".format(split)
)
elif split == TEST:
if test_set is None and not splitter.has_split(2):
raise ValueError(
'The data for the specified split for hyperopt "{}" '
"was not provided, "
"or the split amount specified in the preprocessing section "
"of the config is not greater than 0".format(split)
)
else:
raise ValueError(
'unrecognized hyperopt split "{}". ' "Please provide one of: {}".format(split, {TRAINING, VALIDATION, TEST})
)
if output_feature == COMBINED:
if metric != LOSS:
raise ValueError('The only valid metric for "combined" output feature is "loss"')
else:
output_feature_names = {of[NAME] for of in full_config[OUTPUT_FEATURES]}
if output_feature not in output_feature_names:
raise ValueError(
'The output feature specified for hyperopt "{}" '
"cannot be found in the config. "
'Available ones are: {} and "combined"'.format(output_feature, output_feature_names)
)
hyperopt_executor = get_build_hyperopt_executor(executor[TYPE])(
parameters, output_feature, metric, goal, split, search_alg=search_alg, **executor
)
# Explicitly default to a local backend to avoid picking up Ray
# backend from the environment.
backend = backend or config_dict.get("backend") or "local"
backend = initialize_backend(backend)
if not (
isinstance(backend, LocalBackend)
or (isinstance(hyperopt_executor, RayTuneExecutor) and isinstance(backend, RayBackend))
):
raise ValueError(
"Hyperopt requires using a `local` backend at this time, or " "`ray` backend with `ray` executor."
)
for callback in callbacks or []:
callback.on_hyperopt_init(experiment_name)
if not should_tune_preprocessing(full_config):
# preprocessing is not being tuned, so generate it once before starting trials
for callback in callbacks or []:
callback.on_hyperopt_preprocessing_start(experiment_name)
model = LudwigModel(
config=full_config,
backend=backend,
gpus=gpus,
gpu_memory_limit=gpu_memory_limit,
allow_parallel_threads=allow_parallel_threads,
callbacks=callbacks,
)
training_set, validation_set, test_set, training_set_metadata = model.preprocess(
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
test_set=test_set,
training_set_metadata=training_set_metadata,
data_format=data_format,
skip_save_processed_input=skip_save_processed_input,
random_seed=random_seed,
)
dataset = None
dataset_statistics = generate_dataset_statistics(training_set, validation_set, test_set)
logger.info("\nDataset Statistics")
logger.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid"))
for callback in callbacks or []:
callback.on_hyperopt_preprocessing_end(experiment_name)
for callback in callbacks or []:
callback.on_hyperopt_start(experiment_name)
hyperopt_results = hyperopt_executor.execute(
premerged_config,
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
test_set=test_set,
training_set_metadata=training_set_metadata,
data_format=data_format,
experiment_name=experiment_name,
model_name=model_name,
resume=resume,
skip_save_training_description=skip_save_training_description,
skip_save_training_statistics=skip_save_training_statistics,
skip_save_model=skip_save_model,
skip_save_progress=skip_save_progress,
skip_save_log=skip_save_log,
skip_save_processed_input=skip_save_processed_input,
skip_save_unprocessed_output=skip_save_unprocessed_output,
skip_save_predictions=skip_save_predictions,
skip_save_eval_stats=skip_save_eval_stats,
output_directory=output_directory,
gpus=gpus,
gpu_memory_limit=gpu_memory_limit,
allow_parallel_threads=allow_parallel_threads,
callbacks=callbacks,
tune_callbacks=tune_callbacks,
backend=backend,
random_seed=random_seed,
hyperopt_log_verbosity=hyperopt_log_verbosity,
**kwargs,
)
if backend.is_coordinator():
print_hyperopt_results(hyperopt_results)
if not skip_save_hyperopt_statistics:
with backend.storage.artifacts.use_credentials():
results_directory = os.path.join(output_directory, experiment_name)
makedirs(results_directory, exist_ok=True)
hyperopt_stats = {
"hyperopt_config": hyperopt_config,
"hyperopt_results": [t.to_dict() for t in hyperopt_results.ordered_trials],
}
save_hyperopt_stats(hyperopt_stats, results_directory)
logger.info(f"Hyperopt stats saved to: {results_directory}")
for callback in callbacks or []:
callback.on_hyperopt_end(experiment_name)
callback.on_hyperopt_finish(experiment_name)
logger.info("Finished hyperopt")
return hyperopt_results