import logging
import numpy as np
import pandas as pd
import time
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
from functools import partial
from typing import Union
from scipy.optimize import minimize
from pyace.basis import ACEBBasisSet, BBasisConfiguration
from pyace.evaluator import ACECTildeEvaluator, ACEBEvaluator
from pyace.calculator import ACECalculator
from pyace.paralleldataexecutor import ParallelDataExecutor, LOCAL_DATAFRAME_VARIALBE_NAME
from pyace.radial import *
from pyace.multispecies_basisextension import expand_trainable_parameters, compute_bbasisset_train_mask
from pyace.lossfuncspec import LossFunctionSpecification
from pyace.const import *
from pyace.metrics_aggregator import FitMetrics, MetricsAggregator
import __main__
required_structures_dataframe_columns = [ATOMIC_ENV_COL, ENERGY_CORRECTED_COL, FORCES_COL]
[docs]def batch_compute_energy_forces_function_wrapper(batch_indices, cbasis):
_local_df = getattr(__main__, LOCAL_DATAFRAME_VARIALBE_NAME)
batch_df = _local_df.loc[batch_indices]
ace = ACECalculator()
evaluator = ACECTildeEvaluator()
evaluator.set_basis(cbasis)
ace.set_evaluator(evaluator)
def pure_row_func(ae):
ace.compute(ae)
return ace.energy, ace.forces
if isinstance(batch_df, pd.Series):
return batch_df.map(pure_row_func)
elif isinstance(batch_df, pd.DataFrame):
return batch_df.apply(pure_row_func, axis=1)
[docs]def batch_compute_projections_function_wrapper(batch_indices, potential_params):
_local_df = getattr(__main__, LOCAL_DATAFRAME_VARIALBE_NAME)
batch_df = _local_df.loc[batch_indices]
ace = ACECalculator()
potential_params.is_sort_functions = False
if isinstance(potential_params, BBasisConfiguration):
bbasis = ACEBBasisSet(potential_params)
evaluator = ACEBEvaluator()
evaluator.set_basis(bbasis)
elif isinstance(potential_params, ACECTildeBasisSet):
evaluator = ACECTildeEvaluator()
evaluator.set_basis(potential_params)
elif isinstance(potential_params, ACEBBasisSet):
evaluator = ACEBEvaluator()
evaluator.set_basis(potential_params)
else:
raise ValueError("Unrecognized `potential_params` type: {}. Should be BBasisConfiguration, ACECTildeBasisSet "
"or ACEBBasisSet".format(type(potential_params)))
ace.set_evaluator(evaluator)
def pure_row_func(ae):
ace.compute(ae)
nat = ae.n_atoms_real
# flatten projections
pr = [[pp for p in pr1 for pp in p] + [pp for p in pr2 for pp in p] for pr1, pr2 in
zip(ace.basis_projections_rank1, ace.basis_projections)]
return pr
if isinstance(batch_df, pd.Series):
return batch_df.map(pure_row_func)
elif isinstance(batch_df, pd.DataFrame):
return batch_df.apply(pure_row_func, axis=1)
[docs]class PyACEFit:
"""
Create a class for fitting ACE potential
:param basis (BBasisConfiguration, ACEBBasisSet) basis set specification
:param loss_spec (LossFunctionSpecification)
"""
[docs] def __init__(self, basis: Union[BBasisConfiguration] = None, structures_dataframe: pd.DataFrame = None,
loss_spec: LossFunctionSpecification = None, seed=None, executors_kw_args=None, display_step=10,
trainable_parameters=None):
if basis is not None:
if isinstance(basis, BBasisConfiguration):
self.bbasis = ACEBBasisSet(basis)
else:
raise ValueError(
"`basis` argument should be either 'BBasisConfiguration' or 'ACEBBasisSet', but got " + type(basis))
# blocks = basis.funcspecs_blocks
# if trainable_parameters is None:
# log.info("`trainable_parameters_dict` is not provided, all blocks will be fitted")
self.elements_name = self.bbasis.elements_name
self.trainable_parameters_dict = expand_trainable_parameters(self.elements_name, trainable_parameters)
self.elements_ind_map = {el: i for i, el in enumerate(self.elements_name)}
self._init_trainable_params()
self.cbasis = self.bbasis.to_ACECTildeBasisSet()
else:
self.bbasis = None
self.cbasis = None
self.trainable_params = None
if loss_spec is None:
loss_spec = LossFunctionSpecification()
self.loss_spec = loss_spec
self.seed = seed
self._structures_dataframe = None
self.eval_count = 0
self.iter_num = 0
self.best_loss = None
self.best_params = None
self.res_opt = None
self.params_opt = None
self.bbasis_opt = None
self.cbasis_opt = None
self.data_executor = None
self.executors_kw_args = executors_kw_args or {}
self.global_callback = None
self.initial_loss = None
self.last_loss = None
self.last_epa_mae = None
self.l1 = None
self.l2 = None
self.smooth_quad = None
self._metrics = None
self.eval_time = None
self.display_step = display_step
self.nfuncs = None
if structures_dataframe is not None:
self.structures_dataframe = structures_dataframe
self.fit_metrics_data_dict = {}
self.last_fit_metric_data = None
self.last_test_metric_data = None
self.fit_metric_callback = None
def _init_trainable_params(self):
self.trainable_params_mask = compute_bbasisset_train_mask(self.bbasis, self.trainable_parameters_dict)
self.trainable_params = np.array(self.bbasis.all_coeffs)[self.trainable_params_mask]
@property
def structures_dataframe(self):
return self._structures_dataframe
@property
def metrics(self):
if self._metrics is None:
self.setup_metrics()
return self._metrics
@structures_dataframe.setter
def structures_dataframe(self, value):
self._structures_dataframe = value # .copy()
# self.preprocess_dataframe(self._structures_dataframe)
[docs] def preprocess_dataframe(self, structures_dataframe):
# TODO: energies and forces weights are generated here, if columns not provided
for col in required_structures_dataframe_columns:
if col not in structures_dataframe.columns:
raise ValueError("`structures_dataframe` doesn't contain column {}".format(col))
if FORCES_COL in structures_dataframe.columns:
structures_dataframe[FORCES_COL] = structures_dataframe[FORCES_COL].map(np.array)
if FWEIGHTS_COL in structures_dataframe.columns:
structures_dataframe[FWEIGHTS_COL] = structures_dataframe[FWEIGHTS_COL].map(np.array)
# normalize_energy_forces_weights(structures_dataframe)
[docs] def update_bbasis(self, params):
assert sum(self.trainable_params_mask) == len(params), \
"update_bbasis::trainable parameters mask({}) is inconsistent with params({})" \
.format(sum(self.trainable_params_mask), len(params))
np_array = np.array(self.bbasis.all_coeffs)
np_array[self.trainable_params_mask] = params
self.bbasis.all_coeffs = np_array
return self.bbasis
[docs] def get_cbasis(self, params):
self.bbasis = self.update_bbasis(params)
self.cbasis = self.bbasis.to_ACECTildeBasisSet()
return self.cbasis
[docs] def loss(self, params=None, verbose=False):
if params is None:
params = self.trainable_params
self.eval_count += 1
t0 = time.time()
energy_forces_pred_df = self.predict_energy_forces(params, keep_parallel_dataexecutor=True)
total_na = self.structures_dataframe["NUMBER_OF_ATOMS"].values
dE = (energy_forces_pred_df[ENERGY_PRED_COL] - self.structures_dataframe[ENERGY_CORRECTED_COL]).values
dE_per_atom = dE / total_na
dF = (self.structures_dataframe[FORCES_COL] - energy_forces_pred_df[FORCES_PRED_COL]).values
self.last_epa_mae = np.mean(np.abs(dE_per_atom))
# de = dE #np.hstack(dE.tolist())
# de_pa = dE_per_atom #np.hstack(dE_per_atom.tolist())
df = np.vstack(dF) # np.vstack([v.reshape(-1, 3) for v in dF.tolist()])
self.metrics.compute_metrics(dE.reshape(-1, 1), dE_per_atom.reshape(-1, 1), df,
total_na, dataframe=self.structures_dataframe)
if self.loss_spec.kappa < 1:
# dEsqr = dE ** 2
dEsqr = dE_per_atom ** 2
if EWEIGHTS_COL in self.structures_dataframe.columns:
dEsqr = dEsqr * np.vstack(self.structures_dataframe[EWEIGHTS_COL]).reshape(-1) # structure-wise
e_loss = np.sum(dEsqr)
else:
e_loss = 0
if self.loss_spec.kappa > 0: # forces have contribution to loss function
dFsqr = (self.structures_dataframe[FORCES_COL] - energy_forces_pred_df[FORCES_PRED_COL])
dFsqr = dFsqr.map(lambda f: np.sum(f ** 2, axis=1))
if FWEIGHTS_COL in self.structures_dataframe.columns:
dFsqr = dFsqr * self.structures_dataframe[FWEIGHTS_COL]
dFsqr = dFsqr.map(np.sum)
f_loss = np.sum(dFsqr)
else: # forces have no contribution to loss function
f_loss = 0
basis_coeffs = np.array(self.bbasis.basis_coeffs)
self.l1 = np.sum(np.abs(basis_coeffs))
self.l2 = np.sum(basis_coeffs ** 2)
loss_coeff = \
self.loss_spec.L1_coeffs * self.l1 + self.loss_spec.L2_coeffs * self.l2
loss_crad = 0
if self.loss_spec.w0_rad > 0 or self.loss_spec.w1_rad > 0 or self.loss_spec.w2_rad > 0:
smothness = RadialFunctionSmoothness(RadialFunctionsValues(self.bbasis))
self.smooth_quad = smothness.smooth_quad
loss_crad = self.loss_spec.w0_rad * self.smooth_quad[0] + \
self.loss_spec.w1_rad * self.smooth_quad[1] + \
self.loss_spec.w2_rad * self.smooth_quad[2]
self.last_loss = (1 - self.loss_spec.kappa) * e_loss + self.loss_spec.kappa * f_loss + \
loss_coeff + loss_crad
self.eval_time = time.time() - t0
if self.best_loss is None or self.last_loss < self.best_loss:
self.best_loss = self.last_loss
self.best_params = params
# collect all relevant info into FitMetrics
self.metrics.regs = self.get_reg_components()
self.metrics.reg_weights = self.get_reg_weights()
# already computed above
# self.metrics.compute_metrics(dE.reshape(-1, 1), dE_per_atom.reshape(-1, 1), df,
# total_na, dataframe=self.structures_dataframe)
self.metrics.loss = self.last_loss
self.metrics.eval_time = self.eval_time
# do a snapshot of FitMetrics into FitMetricsData: loss, e_loss, f_loss, reg_loss, RMSE, MAE, MAX_E (E,F), timing
curr_fit_metrics_data = self.metrics.to_FitMetricsDict()
curr_fit_metrics_data["eval_count"] = self.eval_count
curr_fit_metrics_data["iter_num"] = self.iter_num
# store metrics_data into dict (x-> curr_fit_metrics_data)
self.fit_metrics_data_dict[hash(params.tobytes())] = curr_fit_metrics_data
self.last_fit_metric_data = curr_fit_metrics_data
if verbose:
print("Eval {}: loss={}".format(self.eval_count, self.last_loss) + " " * 40 + "\r", end="")
return self.last_loss
[docs] def predict_energy_forces(self, params=None,
structures_dataframe=None,
keep_parallel_dataexecutor=False):
if params is not None:
if isinstance(params, (list, tuple, np.ndarray)):
self.cbasis = self.get_cbasis(params)
elif isinstance(params, ACECTildeBasisSet):
self.cbasis = params
elif isinstance(params, ACEBBasisSet):
self.bbasis = params
self.cbasis = self.bbasis.to_ACECTildeBasisSet()
else:
raise ValueError(
"Type of parameters could be only np.array, list, tuple, ACECTildeBasisSet, ACEBBasisSet" +
"but got {}".format(type(params)))
is_structures_dataframe_refreshed = False
if structures_dataframe is not None:
self.structures_dataframe = structures_dataframe
is_structures_dataframe_refreshed = True
par = partial(batch_compute_energy_forces_function_wrapper, cbasis=self.cbasis)
self._initialize_executor(create_new=is_structures_dataframe_refreshed)
energy_forces_pred = self.data_executor.map(wrapped_pure_func=par)
if not keep_parallel_dataexecutor:
self.data_executor.stop_executor()
energy_forces_pred_df = pd.DataFrame({ENERGY_PRED_COL: energy_forces_pred.map(lambda d: d[0]),
FORCES_PRED_COL: energy_forces_pred.map(lambda d: np.array(d[1]))},
index=energy_forces_pred.index)
return energy_forces_pred_df
[docs] def predict(self, structures_dataframe=None):
return self.predict_energy_forces(structures_dataframe=structures_dataframe)
[docs] def predict_projections(self, params=None,
structures_dataframe=None,
keep_parallel_dataexecutor=False):
is_structures_dataframe_refreshed = False
if structures_dataframe is not None:
self.structures_dataframe = structures_dataframe
is_structures_dataframe_refreshed = True
potential_params = None
if params is not None:
if isinstance(params, (list, tuple, np.ndarray)):
self.cbasis = self.get_cbasis(params)
potential_params = self.cbasis
elif isinstance(params, ACECTildeBasisSet):
self.cbasis = params
potential_params = self.cbasis
elif isinstance(params, ACEBBasisSet):
self.bbasis = params
potential_params = self.bbasis
elif isinstance(params, BBasisConfiguration):
potential_params = params
else:
raise ValueError(
"Type of parameters could be only np.array, list, tuple, ACECTildeBasisSet, ACEBBasisSet" +
"but got {}".format(type(params)))
elif self.bbasis is not None:
log.info("No 'params' provided to predict_projections, bbasis will be used")
potential_params = self.bbasis
else:
raise ValueError(
"Basis is not set. provide `params` argument or create PyACEFit with some predefined basis")
par = partial(batch_compute_projections_function_wrapper, potential_params=potential_params)
self._initialize_executor(create_new=is_structures_dataframe_refreshed)
projections_pred_df = self.data_executor.map(wrapped_pure_func=par)
if not keep_parallel_dataexecutor:
self.data_executor.stop_executor()
return projections_pred_df
[docs] def get_reg_components(self):
if self.smooth_quad is not None:
return [self.l1, self.l2, self.smooth_quad[0], self.smooth_quad[1], self.smooth_quad[2]]
else:
return [self.l1, self.l2, 0., 0., 0.]
[docs] def get_reg_weights(self):
return [self.loss_spec.L1_coeffs, self.loss_spec.L2_coeffs,
self.loss_spec.w0_rad, self.loss_spec.w1_rad, self.loss_spec.w2_rad]
[docs] def get_fitting_data(self):
return self.structures_dataframe
def _callback(self, x, *args, **kwargs):
self.iter_num += 1
# call global callback
self.metrics.record_time(self.eval_time)
self.update_last_metrics(x)
if self.fit_metric_callback is not None:
# example of fit_metric_callback is: print_extended_metrics
self.fit_metric_callback(self.last_fit_metric_data)
else:
if self.iter_num % self.display_step == 0:
MetricsAggregator.print_extended_metrics(self.last_fit_metric_data)
else:
MetricsAggregator.print_detailed_metrics(self.last_fit_metric_data)
# clean self.fit_metrics_data_dict={} for next iteration
# self.fit_metrics_data_dict = {}
if self.global_callback is not None:
self.global_callback(self.bbasis.to_BBasisConfiguration())
[docs] def fit(self, structures_dataframe, method="Nelder-Mead",
options=None, callback=None, verbose=True, fit_metric_callback=None):
if options is None:
options = {"maxiter": 100, "disp": True}
if fit_metric_callback is not None:
self.fit_metric_callback = fit_metric_callback
if structures_dataframe is not None:
self.preprocess_dataframe(structures_dataframe)
self.structures_dataframe = structures_dataframe
else:
raise ValueError("structures_dataframe couldn't be None")
self.global_callback = callback
log.info("Data size:" + str(self.structures_dataframe.shape))
# log.debug("self.structures_dataframe.columns = " + str(self.structures_dataframe.columns))
log.info("Energy weights : " + str(EWEIGHTS_COL in self.structures_dataframe.columns))
log.info("Forces weights : " + str(FWEIGHTS_COL in self.structures_dataframe.columns))
self.eval_count = 0
self.best_loss = None
log.info('Number of trainable parameters: {0}'.format(len(self.trainable_params)))
# self.setup_metrics() # no need, because .metrics is a property with auto initialization
self._initialize_executor()
if 'disp' not in options:
options['disp'] = True
if 'gtol' not in options:
options['gtol'] = 1e-8
log.info('Scipy minimize: method = {}, options = {}'.format(method, options))
self.initial_loss = self.loss(self.trainable_params)
if self.fit_metric_callback is not None:
self.fit_metric_callback(self.last_fit_metric_data)
else:
MetricsAggregator.print_detailed_metrics(self.last_fit_metric_data, title='Initial state:')
MetricsAggregator.print_extended_metrics(self.last_fit_metric_data, title='INIT STATS')
self.fit_metrics_data_dict = {}
res_opt = minimize(self.loss, x0=self.trainable_params, args=(verbose,), method=method, options=options,
callback=self._callback)
self.res_opt = res_opt
self.params_opt = res_opt.x
self.bbasis_opt = self.update_bbasis(self.params_opt)
self.cbasis_opt = self.bbasis.to_ACECTildeBasisSet()
x = res_opt.x
self.update_last_metrics(x)
# TODO: call self.fit_metric_callback one more time?
[docs] def update_last_metrics(self, x):
x_hash = hash(x.tobytes())
true_fit_metric_data = self.fit_metrics_data_dict[x_hash]
true_fit_metric_data["iter_num"] = self.iter_num
true_fit_metric_data["eval_count"] = self.eval_count
# update self.metrics with this true metrics data
self.metrics.from_FitMetricsDict(true_fit_metric_data)
self.last_fit_metric_data = true_fit_metric_data
return true_fit_metric_data
[docs] def setup_metrics(self):
w_e = np.hstack(self.structures_dataframe[EWEIGHTS_COL].tolist())
w_f = np.hstack(self.structures_dataframe[FWEIGHTS_COL].tolist())
self._metrics = FitMetrics(w_e.reshape(-1, 1), w_f.reshape(-1, 1), 1. - self.loss_spec.kappa,
self.loss_spec.kappa, len(self.trainable_params))
self._metrics.nfuncs = self.nfuncs
def _initialize_executor(self, create_new=False):
if self.data_executor is None or create_new:
self.data_executor = ParallelDataExecutor(distributed_data=self.structures_dataframe[ATOMIC_ENV_COL],
**self.executors_kw_args)