Skip to content

mlflow

LogGradients(norm=2, tag=None, sep='/', round_to=3, log_all_grads=False)

Bases: Callback

Callback used to logs of the model at the end of the of each training step.

Parameters:

  • norm (int, default: 2 ) –

    Norm to use for the gradient. Default is L2 norm.

  • tag (str | None, default: None ) –

    Tag to add to the gradients. If None, no tag will be added.

  • sep (str, default: '/' ) –

    Separator to use in the log.

  • round_to (int, default: 3 ) –

    Number of decimals to round the gradients to.

  • log_all_grads (bool, default: False ) –

    If True, log all gradients, not just the total norm.

Source code in quadra/callbacks/mlflow.py
125
126
127
128
129
130
131
132
133
134
135
136
137
def __init__(
    self,
    norm: int = 2,
    tag: str | None = None,
    sep: str = "/",
    round_to: int = 3,
    log_all_grads: bool = False,
):
    self.norm = norm
    self.tag = tag
    self.sep = sep
    self.round_to = round_to
    self.log_all_grads = log_all_grads

on_train_batch_end(trainer, pl_module, outputs, batch, batch_idx, unused=0)

Method called at the end of the train batch Args: trainer: pl.trainer pl_module: lightning module outputs: outputs batch: batch batch_idx: index unused: dl index.

Returns:

  • None

    None

Source code in quadra/callbacks/mlflow.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@rank_zero_only
def on_train_batch_end(
    self,
    trainer: Trainer,
    pl_module: LightningModule,
    outputs: STEP_OUTPUT,
    batch: Any,
    batch_idx: int,
    unused: int | None = 0,
) -> None:
    """Method called at the end of the train batch
    Args:
        trainer: pl.trainer
        pl_module: lightning module
        outputs: outputs
        batch: batch
        batch_idx: index
        unused: dl index.


    Returns:
        None
    """
    # pylint: disable=unused-argument
    logger = get_mlflow_logger(trainer=trainer)

    if logger is None:
        return

    named_params = pl_module.named_parameters()
    grads = self._grad_norm(named_params)
    logger.log_metrics(grads)

LogLearningRate(logging_interval=None, log_momentum=False)

Bases: LearningRateMonitor

Learning rate logger at the end of the training step/epoch.

Parameters:

  • logging_interval (Literal['step', 'epoch'] | None, default: None ) –

    Logging interval.

  • log_momentum (bool, default: False ) –

    If True, log momentum as well.

Source code in quadra/callbacks/mlflow.py
261
262
def __init__(self, logging_interval: Literal["step", "epoch"] | None = None, log_momentum: bool = False):
    super().__init__(logging_interval=logging_interval, log_momentum=log_momentum)

on_train_batch_start(trainer, *args, **kwargs)

Log learning rate at the beginning of the training step if logging interval is set to step.

Source code in quadra/callbacks/mlflow.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def on_train_batch_start(self, trainer, *args, **kwargs):
    """Log learning rate at the beginning of the training step if logging interval is set to step."""
    if not trainer.logger_connector.should_update_logs:
        return
    if self.logging_interval != "epoch":
        logger = get_mlflow_logger(trainer=trainer)

        if logger is None:
            return

        interval = "step" if self.logging_interval is None else "any"
        latest_stat = self._extract_stats(trainer, interval)

        if latest_stat:
            logger.log_metrics(latest_stat, step=trainer.global_step)

on_train_epoch_start(trainer, *args, **kwargs)

Log learning rate at the beginning of the epoch if logging interval is set to epoch.

Source code in quadra/callbacks/mlflow.py
280
281
282
283
284
285
286
287
288
289
290
291
def on_train_epoch_start(self, trainer, *args, **kwargs):
    """Log learning rate at the beginning of the epoch if logging interval is set to epoch."""
    if self.logging_interval != "step":
        interval = "epoch" if self.logging_interval is None else "any"
        latest_stat = self._extract_stats(trainer, interval)
        logger = get_mlflow_logger(trainer=trainer)

        if logger is None:
            return

        if latest_stat:
            logger.log_metrics(latest_stat, step=trainer.global_step)

UploadCheckpointsAsArtifact(ckpt_dir='checkpoints/', ckpt_ext='ckpt', upload_best_only=False, delete_after_upload=True, upload=True)

Bases: Callback

Callback used to upload checkpoints as artifacts.

Parameters:

  • ckpt_dir (str, default: 'checkpoints/' ) –

    Folder where all the checkpoints are stored in artifact folder.

  • ckpt_ext (str, default: 'ckpt' ) –

    Extension of checkpoint files (default: ckpt).

  • upload_best_only (bool, default: False ) –

    Only upload best checkpoint (default: False)

  • delete_after_upload (bool, default: True ) –

    Delete the checkpoint from local storage after uploading (default: True)

  • upload (bool, default: True ) –

    If True, upload the checkpoints. If False, only save them on local machine.

Source code in quadra/callbacks/mlflow.py
200
201
202
203
204
205
206
207
208
209
210
211
212
def __init__(
    self,
    ckpt_dir: str = "checkpoints/",
    ckpt_ext: str = "ckpt",
    upload_best_only: bool = False,
    delete_after_upload: bool = True,
    upload: bool = True,
):
    self.ckpt_dir = ckpt_dir
    self.upload_best_only = upload_best_only
    self.ckpt_ext = ckpt_ext
    self.delete_after_upload = delete_after_upload
    self.upload = upload

on_test_end(trainer, pl_module)

Triggered at the end of test. Uploads all model checkpoints to mlflow as an artifact.

Parameters:

  • trainer (Trainer) –

    Pytorch Lightning trainer.

  • pl_module (LightningModule) –

    Pytorch Lightning module.

Source code in quadra/callbacks/mlflow.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@rank_zero_only
def on_test_end(self, trainer: Trainer, pl_module: LightningModule):
    """Triggered at the end of test. Uploads all model checkpoints to mlflow as an artifact.

    Args:
        trainer: Pytorch Lightning trainer.
        pl_module: Pytorch Lightning module.
    """
    logger = get_mlflow_logger(trainer=trainer)

    if logger is None:
        return

    experiment = logger.experiment

    if (
        trainer.checkpoint_callback
        and self.upload_best_only
        and hasattr(trainer.checkpoint_callback, "best_model_path")
    ):
        if self.upload:
            experiment.log_artifact(
                run_id=logger.run_id,
                local_path=trainer.checkpoint_callback.best_model_path,
                artifact_path="checkpoints",
            )
    else:
        for path in glob.glob(os.path.join(self.ckpt_dir, f"**/*.{self.ckpt_ext}"), recursive=True):
            if self.upload:
                experiment.log_artifact(
                    run_id=logger.run_id,
                    local_path=path,
                    artifact_path="checkpoints",
                )
    if self.delete_after_upload:
        for path in glob.glob(os.path.join(self.ckpt_dir, f"**/*.{self.ckpt_ext}"), recursive=True):
            os.remove(path)

UploadCodeAsArtifact(source_dir)

Bases: Callback

Callback used to upload Code as artifact.

Uploads all *.py files to mlflow as an artifact, at the beginning of the run but after initializing the trainer. It creates project-source folder under mlflow artifacts and other necessary subfolders.

Parameters:

  • source_dir (str) –

    Folder where all the source files are stored.

Source code in quadra/callbacks/mlflow.py
83
84
def __init__(self, source_dir: str):
    self.source_dir = source_dir

on_test_end(trainer, pl_module)

Triggered at the end of test. Uploads all *.py files to mlflow as an artifact.

Parameters:

  • trainer (Trainer) –

    Pytorch Lightning trainer.

  • pl_module (LightningModule) –

    Pytorch Lightning module.

Source code in quadra/callbacks/mlflow.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@rank_zero_only
def on_test_end(self, trainer: Trainer, pl_module: LightningModule):
    """Triggered at the end of test. Uploads all *.py files to mlflow as an artifact.

    Args:
        trainer: Pytorch Lightning trainer.
        pl_module: Pytorch Lightning module.
    """
    logger = get_mlflow_logger(trainer=trainer)

    if logger is None:
        return

    experiment = logger.experiment

    for path in glob.glob(os.path.join(self.source_dir, "**/*.py"), recursive=True):
        stripped_path = path.replace(self.source_dir, "")
        if len(stripped_path.split("/")) > 1:
            file_path_tree = "/" + "/".join(stripped_path.split("/")[:-1])
        else:
            file_path_tree = ""
        experiment.log_artifact(
            run_id=logger.run_id,
            local_path=path,
            artifact_path=f"project-source{file_path_tree}",
        )

check_file_server_dependencies()

Check file dependencies as boto3.

Returns:

  • None

    None

Source code in quadra/callbacks/mlflow.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def check_file_server_dependencies() -> None:
    """Check file dependencies as boto3.

    Returns:
        None
    """
    try:
        # pylint: disable=unused-import,import-outside-toplevel
        import boto3  # noqa
        import minio  # noqa
    except ImportError as e:
        raise ImportError(
            "You are trying to upload mlflow artifacts, but boto3 and minio are not installed. Please install them by"
            " calling pip install minio boto3."
        ) from e

check_minio_credentials()

Check minio credentials for aws based storage such as minio.

Returns:

  • None

    None

Source code in quadra/callbacks/mlflow.py
17
18
19
20
21
22
23
24
25
26
27
28
def check_minio_credentials() -> None:
    """Check minio credentials for aws based storage such as minio.

    Returns:
        None
    """
    check = os.environ.get("AWS_ACCESS_KEY_ID") is not None and os.environ.get("AWS_SECRET_ACCESS_KEY") is not None
    if not check:
        raise ValueError(
            "You are trying to upload mlflow artifacts, but minio credentials are not set. Please set them in your"
            " environment variables."
        )

validate_artifact_storage(logger)

Validate artifact storage.

Parameters:

  • logger (MLFlowLogger) –

    Mlflow logger from pytorch lightning.

Source code in quadra/callbacks/mlflow.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def validate_artifact_storage(logger: MLFlowLogger):
    """Validate artifact storage.

    Args:
        logger: Mlflow logger from pytorch lightning.

    """
    from quadra.utils.utils import get_logger  # pylint: disable=[import-outside-toplevel]

    log = get_logger(__name__)

    client = logger.experiment
    # TODO: we have to access the internal api to get the artifact uri, however there could be a better way
    artifact_uri = client._tracking_client._get_artifact_repo(  # pylint: disable=protected-access
        logger.run_id
    ).artifact_uri
    if artifact_uri.startswith("s3://"):
        check_minio_credentials()
        check_file_server_dependencies()
        log.info("Mlflow artifact storage is AWS/S3 basedand credentials and dependencies are satisfied.")
    else:
        log.info("Mlflow artifact storage uri is %s. Validation checks are not implemented.", artifact_uri)