Recommended way to run many PyMC models in parallel (joblib) without triggering compiledir / cache lock errors? (original) (raw)

Alright, so I think I was able to somehow achieve that via joblib’s initializer. Setting the following function as the initializer, it started running with no errors:

def set_unique_pytensor_compiledir():
    unique_dir = Path(
        tempfile.gettempdir(),
        f"spectranorm_pytensor_{uuid.uuid4().hex}",
    )
    os.environ["PYTENSOR_FLAGS"] = f"compiledir={unique_dir}"

    # ensure directory exists
    Path(unique_dir).mkdir(parents=True, exist_ok=True)

However, after running more than 3000 processes (of more than 10K), it stops with the following error:

2025-11-16 23:41:13 : [INFO] - spectranorm.snm - Starting SNM model fitting:
2025-11-16 23:41:13 : [INFO] - spectranorm.snm - Step 1; direct models for each eigenmode (10000 modes)

Fitting direct models:  31%
3090/10000 [1:02:59<6:17:50,  3.28s/tasks]

/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py:782: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.
  warnings.warn(
Exception ignored in atexit callback: <bound method ModuleCache._on_atexit of <pytensor.link.c.cmodule.ModuleCache object at 0x7fd568abb400>>
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1588, in _on_atexit
    self.clear_old()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1387, in clear_old
    too_old_to_use = self.refresh(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 824, in refresh
    with lock_ctx():
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/compile/compilelock.py", line 78, in lock_ctx
    fl.acquire(timeout=timeout)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/filelock/_api.py", line 341, in acquire
    raise Timeout(lock_filename)  # noqa: TRY301
filelock._error.Timeout: The file lock '/home/ubuntu/.pytensor/compiledir_Linux-5.4--generic-x86_64-with-glibc2.31-x86_64-3.10.18-64/.lock' could not be acquired.

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 490, in _process_worker
    r = call_item()
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 291, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in __call__
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py", line 607, in <listcomp>
    return [func(*args, **kwargs) for func, args, kwargs in self.items]
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 3503, in fit_single_direct
    direct_model.fit(
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1840, in fit
    self._fit_model_with_advi(progress_bar=progress_bar)
  File "/mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py", line 1712, in _fit_model_with_advi
    self._trace = pm.fit(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 775, in fit
    return inference.fit(n, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/inference.py", line 158, in fit
    step_func = self.objective.step_function(score=score, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 393, in step_function
    updates = self.updates(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 268, in updates
    self.add_obj_updates(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 313, in add_obj_updates
    obj_target = self(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 458, in __call__
    a = self.op.apply(self.tf)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/operators.py", line 63, in apply
    return -self.datalogp_norm + self.beta * (self.logq_norm - self.varlogp_norm)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 498, in <lambda>
    datalogp_norm = property(lambda self: self.approx.datalogp_norm)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1377, in datalogp_norm
    return self.datalogp / self.symbolic_normalizing_constant
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1341, in datalogp
    return self.sized_symbolic_datalogp.mean(0)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1321, in sized_symbolic_datalogp
    return self._sized_symbolic_varlogp_and_datalogp[1]  # shape (s,)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/cachetools/_cachedmethod.py", line 97, in wrapper
    v = method(self, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/configparser.py", line 44, in res
    return f(*args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/variational/opvi.py", line 1309, in _sized_symbolic_varlogp_and_datalogp
    [self.model.varlogp, self.model.datalogp]
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 810, in varlogp
    return self.logp(vars=self.free_RVs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/model/core.py", line 696, in logp
    rv_logps = transformed_conditional_logp(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 595, in transformed_conditional_logp
    temp_logp_terms = conditional_logp(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/basic.py", line 479, in conditional_logp
    fgraph = construct_ir_fgraph(rv_values, ir_rewriter=ir_rewriter)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pymc/logprob/rewriting.py", line 254, in construct_ir_fgraph
    ir_rewriter.rewrite(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 120, in rewrite
    return self.apply(fgraph, *args, **kwargs)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
    sub_prof = rewriter.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 292, in apply
    sub_prof = rewriter.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2456, in apply
    sub_prof = grewrite.apply(fgraph)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 2040, in apply
    nb += self.process_node(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1925, in process_node
    self.failure_callback(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1780, in warn_inplace
    return cls.warn(exc, nav, repl_pairs, node_rewriter, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1768, in warn
    raise exc
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1922, in process_node
    replacements = node_rewriter.transform(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
    return self.fn(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1160, in constant_folding
    return unconditional_constant_folding.transform(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/graph/rewriting/basic.py", line 1086, in transform
    return self.fn(fgraph, node)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/tensor/rewriting/basic.py", line 1109, in unconditional_constant_folding
    thunk = node.op.make_thunk(node, storage_map, compute_map, no_recycling=[])
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 125, in make_thunk
    return self.make_c_thunk(node, storage_map, compute_map, no_recycling)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/op.py", line 84, in make_c_thunk
    outputs = cl.make_thunk(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1185, in make_thunk
    cthunk, module, in_storage, out_storage, error_storage = self.__compile__(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1102, in __compile__
    thunk, module = self.cthunk_factory(
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/basic.py", line 1626, in cthunk_factory
    module = cache.module_from_key(key=key, lnk=self)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1217, in module_from_key
    module = self._get_from_hash(module_hash, key)
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 1122, in _get_from_hash
    key_data.add_key(key, save_pkl=bool(key[0]))
  File "/mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/pytensor/link/c/cmodule.py", line 550, in add_key
    assert key not in self.keys
AssertionError
"""

The above exception was the direct cause of the following exception:

AssertionError                            Traceback (most recent call last)
File <timed exec>:24

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:4065, in SpectralNormativeModel.adapt_fit(self, covariate_to_adapt, new_category_names, encoded_train_data, covariates_dataframe, pretrained_model_params, n_jobs, save_directory, save_separate)
   4062     pretrained_model_params = copy.deepcopy(self.model_params)
   4064 # Fit the adapted model
-> 4065 self.fit(
   4066     encoded_train_data,
   4067     covariates_dataframe,
   4068     n_modes=pretrained_model_params["n_modes"],
   4069     n_jobs=n_jobs,
   4070     save_directory=save_directory,
   4071     save_separate=save_separate,
   4072     covariance_structure=pretrained_model_params["sparse_covariance_structure"],
   4073     adapt={
   4074         "covariate_to_adapt": covariate_to_adapt,
   4075         "new_category_names": new_category_names,
   4076         "pretrained_model_params": pretrained_model_params,
   4077     },
   4078 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3933, in SpectralNormativeModel.fit(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, covariance_structure, adapt)
   3926     utils.general.prepare_save_directory(
   3927         save_directory,
   3928         "spectral_normative_model",
   3929     )
   3931 logger.info("Step 1; direct models for each eigenmode (%s modes)", n_modes)
-> 3933 self.fit_all_direct(
   3934     encoded_train_data=encoded_train_data,
   3935     covariates_dataframe=covariates_dataframe,
   3936     n_modes=n_modes,
   3937     n_jobs=n_jobs,
   3938     save_directory=save_directory,
   3939     save_separate=save_separate,
   3940     adapt=adapt,
   3941 )
   3943 logger.info("Step 2; identify sparse covariance structure")
   3945 self.identify_covariance_structure(
   3946     encoded_train_data=encoded_train_data,
   3947     covariates_dataframe=covariates_dataframe,
   (...)
   3950     adapt=adapt,
   3951 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/snm.py:3688, in SpectralNormativeModel.fit_all_direct(self, encoded_train_data, covariates_dataframe, n_modes, n_jobs, save_directory, save_separate, adapt)
   3659 # Fit the base direct model for each eigenmode using parallel processing
   3660 tasks = (
   3661     joblib.delayed(self.fit_single_direct)(
   3662         variable_of_interest=encoded_train_data[:, i],
   (...)
   3685     for i in range(n_modes)
   3686 )
   3687 self.direct_model_params = list(
-> 3688     utils.parallel.ParallelTqdm(
   3689         n_jobs=n_jobs,
   3690         total_tasks=n_modes,
   3691         desc="Fitting direct models",
   3692         initializer=utils.general.set_unique_pytensor_compiledir,
   3693     )(tasks),  # pyright: ignore[reportCallIssue]
   3694 )

File /mountpoint/code/projects/spectranorm/package/spectranorm/src/spectranorm/utils/parallel.py:88, in ParallelTqdm.__call__(self, iterable)
     86             self.total_tasks = len(iterable)
     87     # call parent function
---> 88     return super().__call__(iterable)
     89 finally:
     90     # close tqdm progress bar
     91     if self.progress_bar is not None:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:2072, in Parallel.__call__(self, iterable)
   2066 # The first item from the output is blank, but it makes the interpreter
   2067 # progress until it enters the Try/Except block of the generator and
   2068 # reaches the first `yield` statement. This starts the asynchronous
   2069 # dispatch of the tasks to the workers.
   2070 next(output)
-> 2072 return output if self.return_generator else list(output)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1682, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1679     yield
   1681     with self._backend.retrieval_context():
-> 1682         yield from self._retrieve()
   1684 except GeneratorExit:
   1685     # The generator has been garbage collected before being fully
   1686     # consumed. This aborts the remaining tasks if possible and warn
   1687     # the user if necessary.
   1688     self._exception = True

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1784, in Parallel._retrieve(self)
   1778 while self._wait_retrieval():
   1779     # If the callback thread of a worker has signaled that its task
   1780     # triggered an exception, or if the retrieval loop has raised an
   1781     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1782     # worker traceback.
   1783     if self._aborting:
-> 1784         self._raise_error_fast()
   1785         break
   1787     nb_jobs = len(self._jobs)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:1859, in Parallel._raise_error_fast(self)
   1855 # If this error job exists, immediately raise the error by
   1856 # calling get_result. This job might not exists if abort has been
   1857 # called directly or if the generator is gc'ed.
   1858 if error_job is not None:
-> 1859     error_job.get_result(self.timeout)

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:758, in BatchCompletionCallBack.get_result(self, timeout)
    752 backend = self.parallel._backend
    754 if backend.supports_retrieve_callback:
    755     # We assume that the result has already been retrieved by the
    756     # callback thread, and is stored internally. It's just waiting to
    757     # be returned.
--> 758     return self._return_or_raise()
    760 # For other backends, the main thread needs to run the retrieval step.
    761 try:

File /mountpoint/code/projects/spectranorm/environment/spectranorm_env/lib/python3.10/site-packages/joblib/parallel.py:773, in BatchCompletionCallBack._return_or_raise(self)
    771 try:
    772     if self.status == TASK_ERROR:
--> 773         raise self._result
    774     return self._result
    775 finally:

AssertionError: 

Do you have any idea why this might be happening?