Skip to content

raytune_learner

Ray Tune wrapper and trainable model classes for hyperparameter optimization.

Classes:

  • CheckpointDict

    Dictionary type for checkpoint data.

  • TuneModel

    Trainable model class for Ray Tune.

  • TuneWrapper

    Wrapper class for Ray Tune hyperparameter optimization.

CheckpointDict

Bases: TypedDict

Dictionary type for checkpoint data.

TuneModel

Bases: Trainable

Trainable model class for Ray Tune.

Methods:

  • export_model

    Export model to safetensors format.

  • load_checkpoint

    Load model and optimizer state from checkpoint.

  • objective

    Compute the objective metric(s) for the tuning process.

  • save_checkpoint

    Save model and optimizer state to checkpoint.

  • setup

    Get the model, loss function(s), optimizer, train and test data from the config.

  • step

    For each batch in the training data, calculate the loss and update the model parameters.

export_model

export_model(export_dir: str | None = None) -> None

Export model to safetensors format.

Source code in src/stimulus/learner/raytune_learner.py
294
295
296
297
298
def export_model(self, export_dir: str | None = None) -> None:  # type: ignore[override]
    """Export model to safetensors format."""
    if export_dir is None:
        return
    safe_save_model(self.model, os.path.join(export_dir, "model.safetensors"))

load_checkpoint

load_checkpoint(checkpoint: dict[Any, Any] | None) -> None

Load model and optimizer state from checkpoint.

Source code in src/stimulus/learner/raytune_learner.py
300
301
302
303
304
305
306
def load_checkpoint(self, checkpoint: dict[Any, Any] | None) -> None:
    """Load model and optimizer state from checkpoint."""
    if checkpoint is None:
        return
    checkpoint_dir = checkpoint["checkpoint_dir"]
    self.model = safe_load_model(self.model, os.path.join(checkpoint_dir, "model.safetensors"))
    self.optimizer.load_state_dict(torch.load(os.path.join(checkpoint_dir, "optimizer.pt")))

objective

objective() -> dict[str, float]

Compute the objective metric(s) for the tuning process.

Source code in src/stimulus/learner/raytune_learner.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def objective(self) -> dict[str, float]:
    """Compute the objective metric(s) for the tuning process."""
    metrics = [
        "loss",
        "rocauc",
        "prauc",
        "mcc",
        "f1score",
        "precision",
        "recall",
        "spearmanr",
    ]  # TODO maybe we report only a subset of metrics, given certain criteria (eg. if classification or regression)
    predict_val = PredictWrapper(self.model, self.validation, loss_dict=self.loss_dict)
    predict_train = PredictWrapper(self.model, self.training, loss_dict=self.loss_dict)
    return {
        **{"val_" + metric: value for metric, value in predict_val.compute_metrics(metrics).items()},
        **{"train_" + metric: value for metric, value in predict_train.compute_metrics(metrics).items()},
    }

save_checkpoint

save_checkpoint(checkpoint_dir: str) -> dict[Any, Any]

Save model and optimizer state to checkpoint.

Source code in src/stimulus/learner/raytune_learner.py
308
309
310
311
312
def save_checkpoint(self, checkpoint_dir: str) -> dict[Any, Any]:
    """Save model and optimizer state to checkpoint."""
    safe_save_model(self.model, os.path.join(checkpoint_dir, "model.safetensors"))
    torch.save(self.optimizer.state_dict(), os.path.join(checkpoint_dir, "optimizer.pt"))
    return {"checkpoint_dir": checkpoint_dir}

setup

setup(config: dict[Any, Any]) -> None

Get the model, loss function(s), optimizer, train and test data from the config.

Source code in src/stimulus/learner/raytune_learner.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def setup(self, config: dict[Any, Any]) -> None:
    """Get the model, loss function(s), optimizer, train and test data from the config."""
    # set the seeds the second time, first in TuneWrapper initialization
    set_general_seeds(self.config["ray_worker_seed"])

    # Initialize model with the config params
    self.model = config["model"](**config["network_params"])

    # Get the loss function(s) from the config model params
    self.loss_dict = config["loss_params"]
    for key, loss_fn in self.loss_dict.items():
        try:
            self.loss_dict[key] = getattr(nn, loss_fn)()
        except AttributeError as err:
            raise ValueError(
                f"Invalid loss function: {loss_fn}, check PyTorch for documentation on available loss functions",
            ) from err

    # get the optimizer parameters
    optimizer_lr = config["optimizer_params"]["lr"]
    self.optimizer = getattr(optim, config["optimizer_params"]["method"])(
        self.model.parameters(),
        lr=optimizer_lr,
    )

    # get step size from the config
    self.step_size = config["tune"]["step_size"]

    # Get datasets from Ray's object store
    training, validation = ray.get(self.config["_training_ref"]), ray.get(self.config["_validation_ref"])

    # use dataloader on training/validation data
    self.batch_size = config["data_params"]["batch_size"]
    self.training = DataLoader(
        training,
        batch_size=self.batch_size,
        shuffle=True,
    )
    self.validation = DataLoader(
        validation,
        batch_size=self.batch_size,
        shuffle=True,
    )

    # debug section, first create a dedicated directory for each worker inside Ray_results/<tune_model_run_specific_dir> location
    debug_dir = os.path.join(
        config["tune_run_path"],
        "debug",
        ("worker_with_seed_" + str(self.config["ray_worker_seed"])),
    )
    if config["_debug"]:
        # creating a special directory for it one that is worker/trial/experiment specific
        os.makedirs(debug_dir)
        seed_filename = os.path.join(debug_dir, "seeds.txt")

        # save the initialized model weights
        self.export_model(export_dir=debug_dir)

        # save the seeds
        with open(seed_filename, "a") as seed_f:
            # you can not retrieve the actual seed once it set, or the current seed neither for python, numpy nor torch. so we select five numbers randomly. If that is the first draw of numbers they are always the same.
            python_values = random.sample(range(100), 5)
            numpy_values = list(np.random.randint(0, 100, size=5))
            torch_values = torch.randint(0, 100, (5,)).tolist()
            seed_f.write(
                f"python drawn numbers : {python_values}\nnumpy drawn numbers : {numpy_values}\ntorch drawn numbers : {torch_values}\n",
            )

step

step() -> dict

For each batch in the training data, calculate the loss and update the model parameters.

This calculation is performed based on the model's batch function. At the end, return the objective metric(s) for the tuning process.

Source code in src/stimulus/learner/raytune_learner.py
263
264
265
266
267
268
269
270
271
272
273
def step(self) -> dict:
    """For each batch in the training data, calculate the loss and update the model parameters.

    This calculation is performed based on the model's batch function.
    At the end, return the objective metric(s) for the tuning process.
    """
    for _step_size in range(self.step_size):
        for x, y, _meta in self.training:
            # the loss dict could be unpacked with ** and the function declaration handle it differently like **kwargs. to be decided, personally find this more clean and understable.
            self.model.batch(x=x, y=y, optimizer=self.optimizer, **self.loss_dict)
    return self.objective()

TuneWrapper

TuneWrapper(
    model_config: RayTuneModel,
    data_config_path: str,
    model_class: Module,
    data_path: str,
    encoder_loader: EncoderLoader,
    seed: int,
    ray_results_dir: Optional[str] = None,
    tune_run_name: Optional[str] = None,
    *,
    debug: bool = False,
    autoscaler: bool = False
)

Wrapper class for Ray Tune hyperparameter optimization.

Methods:

Source code in src/stimulus/learner/raytune_learner.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def __init__(
    self,
    model_config: RayTuneModel,
    data_config_path: str,
    model_class: nn.Module,
    data_path: str,
    encoder_loader: EncoderLoader,
    seed: int,
    ray_results_dir: Optional[str] = None,
    tune_run_name: Optional[str] = None,
    *,
    debug: bool = False,
    autoscaler: bool = False,
) -> None:
    """Initialize the TuneWrapper with the paths to the config, model, and data."""
    self.config = model_config.model_dump()

    # set all general seeds: python, numpy and torch.
    set_general_seeds(seed)

    # build the tune config:
    try:
        scheduler_class = getattr(
            tune.schedulers,
            model_config.tune.scheduler.name,
        )  # todo, do this in RayConfigLoader
    except AttributeError as err:
        raise ValueError(
            f"Invalid scheduler: {model_config.tune.scheduler.name}, check Ray Tune for documentation on available schedulers",
        ) from err

    scheduler = scheduler_class(**model_config.tune.scheduler.params)
    self.tune_config = tune.TuneConfig(
        metric=model_config.tune.tune_params.metric,
        mode=model_config.tune.tune_params.mode,
        num_samples=model_config.tune.tune_params.num_samples,
        scheduler=scheduler,
    )

    # build the run config
    self.run_config = train.RunConfig(
        name=tune_run_name
        if tune_run_name is not None
        else "TuneModel_" + datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d_%H-%M-%S"),
        storage_path=ray_results_dir,
        checkpoint_config=train.CheckpointConfig(checkpoint_at_end=True),
        stop=model_config.tune.run_params.stop,
    )

    # add the data path to the config
    if not os.path.exists(data_path):
        raise ValueError("Data path does not exist. Given path:" + data_path)
    self.config["data_path"] = os.path.abspath(data_path)

    # Set up tune_run path
    if ray_results_dir is None:
        ray_results_dir = os.environ.get("HOME", "")
    self.config["tune_run_path"] = os.path.join(
        ray_results_dir,
        tune_run_name
        if tune_run_name is not None
        else "TuneModel_" + datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d_%H-%M-%S"),
    )
    self.config["_debug"] = debug
    self.config["model"] = model_class
    self.config["encoder_loader"] = encoder_loader
    self.config["ray_worker_seed"] = tune.randint(0, 1000)

    self.gpu_per_trial = model_config.tune.gpu_per_trial
    self.cpu_per_trial = model_config.tune.cpu_per_trial

    self.tuner = self.tuner_initialization(
        data_config_path=data_config_path,
        data_path=data_path,
        encoder_loader=encoder_loader,
        autoscaler=autoscaler,
    )

tune

tune() -> ResultGrid

Run the tuning process.

Source code in src/stimulus/learner/raytune_learner.py
187
188
189
def tune(self) -> ray.tune.ResultGrid:
    """Run the tuning process."""
    return self.tuner.fit()

tuner_initialization

tuner_initialization(
    data_config_path: str,
    data_path: str,
    encoder_loader: EncoderLoader,
    *,
    autoscaler: bool = False
) -> Tuner

Prepare the tuner with the configs.

Source code in src/stimulus/learner/raytune_learner.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def tuner_initialization(
    self,
    data_config_path: str,
    data_path: str,
    encoder_loader: EncoderLoader,
    *,
    autoscaler: bool = False,
) -> tune.Tuner:
    """Prepare the tuner with the configs."""
    # Get available resources from Ray cluster
    cluster_res = cluster_resources()
    logging.info(f"CLUSTER resources   ->  {cluster_res}")

    # Check per-trial resources
    try:
        if self.gpu_per_trial > cluster_res["GPU"] and not autoscaler:
            raise ValueError(
                "GPU per trial is more than what is available in the cluster, set autoscaler to True to allow for autoscaler to be used.",
            )
    except KeyError as err:
        logging.warning(f"KeyError: {err}, no GPU resources available in the cluster: {cluster_res}")

    if self.cpu_per_trial > cluster_res["CPU"] and not autoscaler:
        raise ValueError(
            "CPU per trial is more than what is available in the cluster, set autoscaler to True to allow for autoscaler to be used.",
        )

    logging.info(f"PER_TRIAL resources ->  GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}")

    # Pre-load and encode datasets once, then put them in Ray's object store

    training = TorchDataset(
        config_path=data_config_path,
        csv_path=data_path,
        encoder_loader=encoder_loader,
        split=0,
    )
    validation = TorchDataset(
        config_path=data_config_path,
        csv_path=data_path,
        encoder_loader=encoder_loader,
        split=1,
    )

    # log to debug the names of the columns and shapes of tensors for a batch of training
    # Log shapes of encoded tensors for first batch of training data
    inputs, labels, meta = training[0:10]

    logging.debug("Training data tensor shapes:")
    for field, tensor in inputs.items():
        logging.debug(f"Input field '{field}' shape: {tensor.shape}")

    for field, tensor in labels.items():
        logging.debug(f"Label field '{field}' shape: {tensor.shape}")

    for field, values in meta.items():
        logging.debug(f"Meta field '{field}' length: {len(values)}")

    training_ref = ray.put(training)
    validation_ref = ray.put(validation)

    self.config["_training_ref"] = training_ref
    self.config["_validation_ref"] = validation_ref

    # Configure trainable with resources and dataset parameters
    trainable = tune.with_resources(
        tune.with_parameters(
            TuneModel,
        ),
        resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial},
    )

    return tune.Tuner(trainable, tune_config=self.tune_config, param_space=self.config, run_config=self.run_config)