Source code for pyace.fitadapter

import logging
import numpy as np
import pandas as pd
import warnings

from typing import Dict, Union, Callable

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

from pyace.const import *
from pyace.basis import BBasisConfiguration, ACEBBasisSet

from pyace.multispecies_basisextension import compute_bbasisset_train_mask, expand_trainable_parameters
from pyace.lossfuncspec import LossFunctionSpecification


[docs]class BackendConfig:
[docs] def __init__(self, backend_config_dict: Dict): self.backend_config_dict = backend_config_dict self.validate()
@property def evaluator_name(self): return self.backend_config_dict[BACKEND_EVALUATOR_KW] def __getitem__(self, item): return self.backend_config_dict[item] def __setitem__(self, key, value): self.backend_config_dict[key] = value
[docs] def validate(self): pass
[docs] def get(self, item, default_value=None): return self.backend_config_dict.get(item, default_value)
[docs]class FitBackendAdapter:
[docs] def __init__(self, backend_config: Union[Dict, BackendConfig], loss_spec: LossFunctionSpecification = None, fit_config: Dict = None, callback: Callable = None, fit_metrics_callback: Callable = None, test_metrics_callback: Callable = None): if isinstance(backend_config, dict): self.backend_config = BackendConfig(backend_config) else: self.backend_config = backend_config self.callback = callback self.loss_spec = loss_spec self.fit_config = fit_config self.res_opt = None self.fitter = None self.metrics = None self.fit_metrics_callback = fit_metrics_callback self.test_metrics_callback = test_metrics_callback
@property def evaluator_name(self): return self.backend_config.evaluator_name
[docs] def fit(self, bbasisconfig: BBasisConfiguration, dataframe: pd.DataFrame, loss_spec: LossFunctionSpecification = None, fit_config: Dict = None, callback: Callable = None, test_dataframe: pd.DataFrame = None ) -> BBasisConfiguration: if loss_spec is None: loss_spec = self.loss_spec else: self.loss_spec = loss_spec if fit_config is None: fit_config = self.fit_config if callback is not None: self.callback = callback trainable_parameters = fit_config.get("trainable_parameters", []) # default value = [] -> ["ALL"] # convert fit_blocks to indices of the blocks to fit elements = ACEBBasisSet(bbasisconfig).elements_name trainable_parameters_dict = expand_trainable_parameters(elements=elements, trainable_parameters=trainable_parameters) log.info("Trainable parameters: {}".format(trainable_parameters_dict)) # save globally self.trainable_parameters_dict = trainable_parameters_dict self.bbasisconfig = bbasisconfig if self.backend_config.evaluator_name == TENSORPOT_EVAL: from tensorflow.python.framework.errors_impl import ResourceExhaustedError while True: try: self.setup_tensorpot(bbasisconfig, dataframe, loss_spec, trainable_parameters_dict) fit_res = self.run_tensorpot_fit(bbasisconfig, dataframe, loss_spec, fit_config, trainable_parameters_dict, test_dataframe=test_dataframe) self.log_optimization_result() return fit_res except ResourceExhaustedError as e: log.error("ResourceExhaustedError errors encountered") if self.backend_config.get(BACKEND_BATCH_SIZE_REDUCTION_KW, True): batch_size = self.backend_config.get(BACKEND_BATCH_SIZE_KW, 10) batch_size_reduction_factor = self.backend_config.get(BACKEND_BATCH_SIZE_REDUCTION_FACTOR_KW, 1.618) # default - golden ratio new_batch_size = int(batch_size / batch_size_reduction_factor) log.info("Decrease batch size (by factor {}): {} -> {}".format(batch_size_reduction_factor, batch_size, new_batch_size)) if new_batch_size < 1: log.error("New batch size is too small, stopping") raise RuntimeError("No further batch size reduction is possible, stopping") self.backend_config[BACKEND_BATCH_SIZE_KW] = new_batch_size else: log.error("Use `backend:batch_size_reduction` option for automatic batch size reduction") raise RuntimeError("ResourceExhaustedError errors encountered. " + "Consider using `backend::{}=true` option for automatic batch size reduction".format( BACKEND_BATCH_SIZE_REDUCTION_KW)) except Exception as e: raise e elif self.backend_config.evaluator_name == PYACE_EVAL: self.setup_pyace(bbasisconfig, dataframe, loss_spec, trainable_parameters_dict) fit_res = self.run_pyace_fit(bbasisconfig, dataframe, loss_spec, fit_config, trainable_parameters_dict) self.log_optimization_result() return fit_res else: raise ValueError('{0} is not a valid evaluator'.format(self.backend_config.evaluator_name))
[docs] def log_optimization_result(self, res_opt=None): if res_opt is None: res_opt = self.res_opt try: log.info( "Optimization result(success={success}, status={status}, message={message}, nfev={nfev}, njev={njev})".format( success=res_opt.success, status=res_opt.status, message=res_opt.message, nfev=res_opt.nfev, njev=res_opt.njev )) except Exception as e: log.error("Optimization result: not available: " + str(e))
[docs] def get_evaluator_version_dict(self): try: if self.backend_config.evaluator_name == TENSORPOT_EVAL: import tensorpotential return {TENSORPOT_EVAL + "_version": tensorpotential.__version__} elif self.backend_config.evaluator_name == PYACE_EVAL: from pyace import __version__, get_ace_evaluator_version return {PYACE_EVAL + "_version": __version__, "ace_evaluator_version": get_ace_evaluator_version()} else: raise ValueError('{0} is not a valid evaluator'.format(self.backend_config.evaluator_name)) except Exception as e: log.error(e) return {}
[docs] def setup_tensorpot(self, bbasisconfig: BBasisConfiguration, dataframe: pd.DataFrame, loss_spec: LossFunctionSpecification, trainable_parameters_dict: Dict ) -> BBasisConfiguration: from tensorpotential.potentials.ace import ACE from tensorpotential.tensorpot import TensorPotential from tensorpotential.fit import FitTensorPotential from tensorpotential.utils.utilities import batching_data, init_gpu_config from tensorpotential.constants import (LOSS_TYPE, LOSS_FORCE_FACTOR, LOSS_ENERGY_FACTOR, L1_REG, L2_REG, AUX_LOSS_FACTOR) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) gpu_config = self.backend_config.get(BACKEND_GPU_CONFIG, None) init_gpu_config(gpu_config) batch_size = self.backend_config.get(BACKEND_BATCH_SIZE_KW, 10) log.info("Loss function specification: " + str(loss_spec)) log.info("Batch size: {}".format(batch_size)) batches = batching_data(dataframe, batch_size=batch_size) n_batches = len(batches) loss_force_factor = loss_spec.kappa has_smoothness = (np.array([loss_spec.w0_rad, loss_spec.w1_rad, loss_spec.w2_rad]) != 0).any() has_orthogonality = loss_spec.w_orth != 0 aux_loss_factor = [] if has_orthogonality: aux_loss_factor += [np.float64(loss_spec.w_orth) / n_batches] if has_smoothness: aux_loss_factor += [np.float64(loss_spec.w0_rad) / n_batches, np.float64(loss_spec.w1_rad) / n_batches, np.float64(loss_spec.w2_rad) / n_batches] loss_specs = { LOSS_TYPE: 'per-atom', LOSS_FORCE_FACTOR: loss_force_factor, LOSS_ENERGY_FACTOR: (1 - loss_force_factor), L1_REG: np.float64(loss_spec.L1_coeffs) / n_batches, L2_REG: np.float64(loss_spec.L2_coeffs) / n_batches } if aux_loss_factor: loss_specs[AUX_LOSS_FACTOR] = aux_loss_factor ace_potential = ACE(bbasisconfig, compute_orthogonality=has_orthogonality, compute_smoothness=has_smoothness) tensorpotential = TensorPotential(ace_potential, loss_specs=loss_specs) display_step = self.backend_config.get('display_step', 20) self.fitter = FitTensorPotential(tensorpotential, display_step=display_step) # assign total_number_of_functions to fitter total_number_of_functions = bbasisconfig.total_number_of_functions self.fitter.nfuncs = total_number_of_functions
[docs] def run_tensorpot_fit(self, bbasisconfig: BBasisConfiguration, dataframe: pd.DataFrame, loss_spec: LossFunctionSpecification, fit_config: Dict, trainable_parameters_dict: Dict, test_dataframe: pd.DataFrame = None ) -> BBasisConfiguration: with warnings.catch_warnings(): # adapt call of self._callback(current_bbasisconfig) from FitTensorPotential.callback(coeffs) def adapted_callback(coeffs): new_config = self.fitter.tensorpot.potential.get_updated_config(updating_coefs=coeffs) self._callback(new_config) jacobian_factor = compute_bbasisset_train_mask(bbasisconfig, trainable_parameters_dict) if np.all(jacobian_factor): jacobian_factor = None # default value - train all else: jacobian_factor = jacobian_factor.astype(np.float) batch_size = self.backend_config.get(BACKEND_BATCH_SIZE_KW, 10) fit_options = fit_config.get(FIT_OPTIONS_KW, None) self.fitter.fit(dataframe, test_df=test_dataframe, niter=fit_config[FIT_NITER_KW], optimizer=fit_config[FIT_OPTIMIZER_KW], batch_size=batch_size, jacobian_factor=jacobian_factor, callback=adapted_callback, # call adapted_callback to pass current_bbasisconfig options=fit_options, fit_metric_callback=self.fit_metrics_callback, test_metric_callback=self.test_metrics_callback ) self.res_opt = self.fitter.res_opt new_config = self.fitter.tensorpot.potential.get_updated_config(updating_coefs=self.res_opt.x) return new_config
[docs] def setup_pyace(self, bbasisconfig: BBasisConfiguration, dataframe: pd.DataFrame, loss_spec: LossFunctionSpecification, trainable_parameters_dict: Dict ) -> BBasisConfiguration: from pyace.pyacefit import PyACEFit parallel_mode = self.backend_config.get(BACKEND_PARALLEL_MODE_KW) or "serial" batch_size = len(dataframe) log.info("Loss function specification: " + str(loss_spec)) display_step = self.backend_config.get('display_step', 20) # TODO: consider loss_spec.w_orth self.fitter = PyACEFit(basis=bbasisconfig, loss_spec=loss_spec, executors_kw_args=dict(parallel_mode=parallel_mode, batch_size=batch_size, n_workers=self.backend_config.get(BACKEND_NWORKERS_KW, None) ), seed=42, display_step=display_step, trainable_parameters=trainable_parameters_dict) # maxiter = fit_config.get(FIT_NITER_KW, 100) # # fit_options = fit_config.get(FIT_OPTIONS_KW, {}) # options = {"maxiter": maxiter, "disp": True} # options.update(fit_options) # assign total_number_of_functions to fitter total_number_of_functions = bbasisconfig.total_number_of_functions self.fitter.nfuncs = total_number_of_functions
[docs] def run_pyace_fit(self, bbasisconfig: BBasisConfiguration, dataframe: pd.DataFrame, loss_spec: LossFunctionSpecification, fit_config: Dict, trainable_parameters_dict: Dict, test_dataframe: pd.DataFrame = None ) -> BBasisConfiguration: maxiter = fit_config.get(FIT_NITER_KW, 100) fit_options = fit_config.get(FIT_OPTIONS_KW, {}) options = {"maxiter": maxiter, "disp": True} options.update(fit_options) self.fitter.fit(structures_dataframe=dataframe, method=fit_config[FIT_OPTIMIZER_KW], options=options, callback=self._callback, fit_metric_callback=self.fit_metrics_callback ) self.res_opt = self.fitter.res_opt new_bbasisconf = self.fitter.bbasis_opt.to_BBasisConfiguration() # bbasisconfig.set_all_coeffs(new_bbasisconf.get_all_coeffs()) return new_bbasisconf
[docs] def setup_backend_for_predict(self, bbasisconfig): if bbasisconfig is None: raise ValueError("`bbasisconfig` couldn't be None for FitAdapter.setup_backend_for_predict") log.info("Setting {} backend for predicting".format(self.backend_config.evaluator_name)) if self.backend_config.evaluator_name == TENSORPOT_EVAL: from tensorpotential.potentials.ace import ACE from tensorpotential.tensorpot import TensorPotential from tensorpotential.fit import FitTensorPotential ace_pot = ACE(bbasisconfig) tp = TensorPotential(ace_pot) self.fitter = FitTensorPotential(tensorpot=tp, eager=True) elif self.backend_config.evaluator_name == PYACE_EVAL: from pyace import PyACEFit self.fitter = PyACEFit(bbasisconfig) else: raise ValueError('{0} is not a valid evaluator'.format(self.backend_config.evaluator_name))
[docs] def predict(self, structures_dataframe=None, bbasisconfig=None): if self.fitter is None: self.setup_backend_for_predict(bbasisconfig) return self.fitter.predict(structures_dataframe)
[docs] def compute_metrics(self, energy_col='energy_corrected', nat_column='NUMBER_OF_ATOMS', force_col='forces'): results = {} prediction = self.predict() l1, l2, smth1, smth2, smth3 = self.fitter.get_reg_components() datadf = self.fitter.get_fitting_data() datadf[force_col] = datadf[force_col].apply(np.array) datadf['w_forces'] = datadf['w_forces'].apply(np.reshape, newshape=[-1, 1]) de = prediction['energy_pred'] - datadf[energy_col] df = prediction['forces_pred'] - datadf[force_col] e_loss = np.float(np.sum(datadf['w_energy'] * de ** 2)) f_loss = np.sum((datadf['w_forces'] * df ** 2).map(np.sum)) mae_pae = np.mean(np.abs(de / datadf[nat_column])) mae_e = np.mean(np.abs(de)) mae_f = np.mean(np.abs(df).map(np.mean)) rmse_pae = np.sqrt(np.mean(de ** 2 / datadf[nat_column])) rmse_e = np.sqrt(np.mean(de ** 2)) rmse_f = np.sqrt(np.mean((df ** 2).map(np.mean))) results['mae_pae'] = mae_pae results['mae_e'] = mae_e results['mae_f'] = mae_f results['rmse_pae'] = rmse_pae results['rmse_e'] = rmse_e results['rmse_f'] = rmse_f results['e_loss'] = e_loss results['f_loss'] = f_loss results['l1'] = l1 results['l2'] = l2 results['radial_smooth'] = [smth1, smth2, smth3] return results
[docs] def print_detailed_metrics(self, title='Iteration:'): if self.fitter is not None: self.fitter.print_detailed_metrics(title=title)
[docs] def print_extended_metrics(self, title='Iteration:'): if self.fitter is not None: self.fitter.print_extended_metrics(title=title)
def _callback(self, current_bbasisconfig: BBasisConfiguration): if self.callback is not None: self.callback(current_bbasisconfig) @property def last_loss(self): if self.backend_config.evaluator_name == TENSORPOT_EVAL: return self.fitter.loss_history[-1] elif self.backend_config.evaluator_name == PYACE_EVAL: return self.fitter.last_loss @property def last_fit_metric_data(self): if self.fitter is not None: last_fit_metric_data = self.fitter.last_fit_metric_data if last_fit_metric_data is None: last_fit_metric_data = {} return last_fit_metric_data @last_fit_metric_data.setter def last_fit_metric_data(self, value): if self.fitter is not None: self.fitter.last_fit_metric_data = value @property def last_test_metric_data(self): if self.fitter is not None: last_test_metric_data = self.fitter.last_test_metric_data if last_test_metric_data is None: last_test_metric_data = {} return last_test_metric_data @last_test_metric_data.setter def last_test_metric_data(self, value): if self.fitter is not None: self.fitter.last_test_metric_data = value