Source code for RCAIDE.Framework.Optimization.Packages.additive.additive_setup


# additive_setup.py
#
# Created:  Apr 2017, T. MacDonald
# Modified: Jun 2017, T. MacDonald
#           Oct 2019, T. MacDonald
#           Jun 2020, M. Clarke

# ----------------------------------------------------------------------
#  Imports
# ----------------------------------------------------------------------

import numpy as np
import RCAIDE
try:
    import pyOpt
    import pyOpt.pySNOPT
    import pyOpt.pyALPSO
except:
    pass
import sklearn
from sklearn import gaussian_process
from RCAIDE.Framework.Optimization.Common import helper_functions as help_fun
from RCAIDE.Library.Methods.Utilities.latin_hypercube_sampling import latin_hypercube_sampling
from scipy.stats import norm
import os
import sys
from scipy.optimize import minimize
from scipy.optimize import shgo

# ----------------------------------------------------------------------
#  Additive Solve Functions
# ----------------------------------------------------------------------
[docs] class Additive_Solver():
[docs] def __init__(self): # These defaults are chosen since they are built in to scipy and # therefore are always available when running RCAIDE self.local_optimizer = 'SLSQP' self.global_optimizer = 'SHGO' return
[docs] def Additive_Solve(self,problem,num_fidelity_levels=2,num_samples=10,max_iterations=10, tolerance=1e-6,opt_type='basic',num_starts=3,print_output=True): """Solves a multifidelity problem using an additive corrections Assumptions: N/A Source: N/A Inputs: problem [nexus()] num_fidelity_levels [int] num_samples [int] max_iterations [int] tolerance [float] opt_type [str] num_starts [int] print_output [bool] Outputs: (fOpt,xOpt) [tuple] Properties Used: N/A """ if print_output == False: devnull = open(os.devnull,'w') sys.stdout = devnull if num_fidelity_levels != 2: raise NotImplementedError('Additive corrections are only implemented for 2 fidelity levels.') # History writing f_out = open('add_hist.txt','w') import datetime f_out.write(str(datetime.datetime.now())+'\n') inp = problem.optimization_problem.inputs obj = problem.optimization_problem.objective con = problem.optimization_problem.constraints # Set inputs nam = inp[:,0] # Names ini = inp[:,1] # Initials bnd = inp[:,2] # Bounds scl = inp[:,3] # Scale typ = inp[:,4] # Type (x,scaled_constraints,x_low_bound,x_up_bound,con_up_edge,con_low_edge) = self.scale_vals(inp, con, ini, bnd, scl) # Get initial set of samples x_samples = latin_hypercube_sampling(len(x),num_samples,bounds=(x_low_bound,x_up_bound),criterion='center') # Initialize objective and constraint variables f = np.zeros([num_fidelity_levels,num_samples]) g = np.zeros([num_fidelity_levels,num_samples,len(scaled_constraints)]) for level in range(1,num_fidelity_levels+1): problem.fidelity_level = level for ii,x in enumerate(x_samples): res = self.evaluate_model(problem,x,scaled_constraints) f[level-1,ii] = res[0] # objective value g[level-1,ii,:] = res[1] # constraints vector converged = False for kk in range(max_iterations): # Build objective surrogate f_diff = f[1,:] - f[0,:] f_additive_surrogate_base = gaussian_process.GaussianProcessRegressor() f_additive_surrogate = f_additive_surrogate_base.fit(x_samples, f_diff) # Build constraint surrogate g_diff = g[1,:] - g[0,:] g_additive_surrogate_base = gaussian_process.GaussianProcessRegressor() g_additive_surrogate = g_additive_surrogate_base.fit(x_samples, g_diff) # Optimize corrected model # Chose method --------------- if opt_type == 'basic': # Next point determined by surrogate optimum problem.fidelity_level = 1 x_eval = latin_hypercube_sampling(len(x),1,bounds=(x_low_bound,x_up_bound),criterion='random')[0] if self.local_optimizer == 'SNOPT': opt_prob = pyOpt.Optimization('RCAIDE',self.evaluate_corrected_model, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate) # Set up opt_prob self.initialize_opt_vals(opt_prob,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval) opt = pyOpt.pySNOPT.SNOPT() outputs = opt(opt_prob, sens_type='FD',problem=problem, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate)#, sens_step = sense_step) fOpt = outputs[0][0] xOpt = outputs[1] elif self.local_optimizer == 'SLSQP': x0,constraints = self.initialize_opt_vals_SLSQP(obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval,problem,g_additive_surrogate) res = minimize(self.evaluate_corrected_model, x0,constraints=constraints,args=(problem,f_additive_surrogate,g_additive_surrogate),options={'ftol':1e-6,'disp':True}) fOpt = res['fun'] xOpt = res['x'] else: raise NotImplementedError elif opt_type == 'MEI': # Next point determined by maximum expected improvement fstar = np.min(f[1,:]) problem.fidelity_level = 1 if self.global_optimizer == 'ALPSO': opt_prob = pyOpt.Optimization('RCAIDE',self.evaluate_expected_improvement, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate,fstar=fstar) # Set up opt_prob self.initialize_opt_vals(opt_prob,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,None) # Use a global optimizer opt = pyOpt.pyALPSO.ALPSO() opt.setOption('maxOuterIter',value=20) opt.setOption('seed',value=1.) outputs = opt(opt_prob,problem=problem, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate,fstar=fstar,cons=con)#, sens_step = sense_step) fOpt = np.nan imOpt = outputs[0] xOpt = outputs[1] elif self.global_optimizer == 'SHGO': xb, shgo_cons = self.initialize_opt_vals_SHGO(obj, inp, x_low_bound, x_up_bound, con_low_edge, con_up_edge, nam, con, problem, g_additive_surrogate) #self.global_optimizer = 'SHGO' options = {} res = shgo(self.evaluate_expected_improvement, xb, iters=2, args=(problem,f_additive_surrogate,g_additive_surrogate,fstar),constraints=shgo_cons,options=options) fOpt = np.nan imOpt = res['fun'] xOpt = res['x'] else: raise NotImplementedError # --------------------------------- complete_flag = False if np.any(np.isnan(xOpt)): complete_flag = True else: # Add new samples and check objective and constraint values f = np.hstack((f,np.zeros((num_fidelity_levels,1)))) g = np.hstack((g,np.zeros((num_fidelity_levels,1,len(con))))) x_samples = np.vstack((x_samples,xOpt)) for level in range(1,num_fidelity_levels+1): problem.fidelity_level = level res = self.evaluate_model(problem,xOpt,scaled_constraints) f[level-1][-1] = res[0] g[level-1][-1] = res[1] # History writing f_out.write('Iteration: ' + str(kk+1) + '\n') f_out.write('x0 : ' + str(xOpt[0]) + '\n') f_out.write('x1 : ' + str(xOpt[1]) + '\n') if opt_type == 'basic': f_out.write('expd hi : ' + str(fOpt) + '\n') elif opt_type == 'MEI': f_out.write('expd imp : ' + str(imOpt) + '\n') f_out.write('low obj : ' + str(f[0][-1]) + '\n') f_out.write('hi obj : ' + str(f[1][-1]) + '\n') if kk == (max_iterations-1) or complete_flag == True: # Reached maximum number of iterations f_diff = f[1,:] - f[0,:] if opt_type == 'basic': # If basic setting f already has the expected optimum problem.fidelity_level = 2 fOpt = self.evaluate_model(problem,xOpt,scaled_constraints)[0][0] elif opt_type == 'MEI': # If MEI, find the optimum of the final surrogate min_ind = np.argmin(f[1]) x_eval = x_samples[min_ind] if self.local_optimizer == 'SNOPT': opt_prob = pyOpt.Optimization('RCAIDE',self.evaluate_corrected_model, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate) # Set up opt_prob self.initialize_opt_vals(opt_prob,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval) fOpt, xOpt = self.run_objective_optimization(opt_prob,problem,f_additive_surrogate,g_additive_surrogate) elif self.local_optimizer == 'SLSQP': problem.fidelity_level = 1 x0,constraints = self.initialize_opt_vals_SLSQP(obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval,problem,g_additive_surrogate) res = minimize(self.evaluate_corrected_model, x0,constraints=constraints,args=(problem,f_additive_surrogate,g_additive_surrogate),options={'ftol':1e-6,'disp':True}) fOpt = res['fun'] xOpt = res['x'] problem.fidelity_level = 2 fOpt = self.evaluate_model(problem,xOpt,scaled_constraints)[0][0] f_out.write('x0_opt : ' + str(xOpt[0]) + '\n') f_out.write('x1_opt : ' + str(xOpt[1]) + '\n') f_out.write('final opt : ' + str(fOpt) + '\n') print('Iteration Limit Reached') break if np.abs(fOpt-f[1][-1]) < tolerance: # Converged within a tolerance print('Convergence reached') f_out.write('Convergence reached') f_diff = f[1,:] - f[0,:] converged = True if opt_type == 'MEI': problem.fidelity_level = 1 min_ind = np.argmin(f[1]) x_eval = x_samples[min_ind] if self.local_optimizer == 'SNOPT': opt_prob = pyOpt.Optimization('RCAIDE',self.evaluate_corrected_model, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate) initalize_opt_vals(opt_prob,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval) opt = pyOpt.pySNOPT.SNOPT() outputs = opt(opt_prob, sens_type='FD',problem=problem, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate)#, sens_step = sense_step) fOpt = outputs[0][0] xOpt = outputs[1] elif self.local_optimizer == 'SLSQP': x0,constraints = self.initialize_opt_vals_SLSQP(opt_prob,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval,problem,g_additive_surrogate) res = minimize(self.evaluate_corrected_model, x0,constraints=constraints,args=(problem,f_additive_surrogate,g_additive_surrogate),options={'ftol':1e-6,'disp':True}) fOpt = res['fun'] xOpt = res['x'] else: raise NotImplementedError problem.fidelity_level = 2 fOpt = self.evaluate_model(problem,xOpt,scaled_constraints)[0][0] f_out.write('x0_opt : ' + str(xOpt[0]) + '\n') f_out.write('x1_opt : ' + str(xOpt[1]) + '\n') f_out.write('final opt : ' + str(fOpt) + '\n') break fOpt = f[1][-1]*1. if converged == False: print('Iteration Limit reached') f_out.write('Maximum iteration limit reached') # Save sample data np.save('x_samples.npy',x_samples) np.save('f_data.npy',f) f_out.close() print(fOpt,xOpt) if print_output == False: sys.stdout = sys.__stdout__ # Format objective function to array, ensure output consistency if np.isscalar(fOpt): FOpt = np.array([fOpt]) else: FOpt = fOpt.astype(np.double) return (FOpt,xOpt)
[docs] def evaluate_model(self,problem,x,cons): """Solves the optimization problem to get the objective and constraints Assumptions: N/A Source: N/A Inputs: problem [nexus()] x [array] cons [array] Outputs: f [float] g [array] Properties Used: N/A """ f = np.array(0.) g = np.zeros(np.shape(cons)) f = problem.objective(x) g = problem.all_constraints(x) return f,g
[docs] def evaluate_corrected_model(self,x,problem=None,obj_surrogate=None,cons_surrogate=None): """Evaluates the corrected model with the low fidelity plus the corrections Assumptions: N/A Source: N/A Inputs: x [array] problem [nexus()] obj_surrogate [fun()] cons_surrogate [fun()] Outputs: obj [float] const [array] fail [bool] Properties Used: N/A """ obj = problem.objective(x) const = problem.all_constraints(x).tolist() fail = np.array(np.isnan(obj.tolist()) or np.isnan(np.array(const).any())).astype(int) obj_addition = obj_surrogate.predict(np.atleast_2d(x)) cons_addition = cons_surrogate.predict(np.atleast_2d(x)) obj = obj + obj_addition const = const + cons_addition const = const.tolist()[0] self.const_list = const print('Inputs') print(x) print('Obj') print(obj) print('Con') print(const) if self.local_optimizer == 'SNOPT': return obj,const,fail elif self.local_optimizer == 'SLSQP': return obj else: raise NotImplementedError('Selected local optimizer is not implemented.')
[docs] def evaluate_expected_improvement(self,x,problem=None,obj_surrogate=None,cons_surrogate=None,fstar=np.inf,cons=None): """Evaluates the expected improvement of the point x Assumptions: N/A Source: N/A Inputs: x [array] problem [nexus()] obj_surrogate [fun()] cons_surrogate [fun()] fstar [float] cons [vector] Outputs: -EI [float] const [array] fail [bool] Properties Used: N/A """ if np.any(np.isnan(x)): if self.global_optimizer == 'ALPSO': raise ValueError('Unknown error in ALPSO optimizer created NaN values.') return np.inf obj = problem.objective(x) const = problem.all_constraints(x).tolist() fail = np.array(np.isnan(obj.tolist()) or np.isnan(np.array(const).any())).astype(int) # Get uncertainty information obj_addition, obj_sigma = obj_surrogate.predict(np.atleast_2d(x),return_std=True) cons_addition, cons_sigma = cons_surrogate.predict(np.atleast_2d(x),return_std=True) fhat = obj[0] + obj_addition # Calculate expected improvement (based on Schonlau, Computer Experiments and Global Optimization, 1997) EI = (fstar-fhat)*norm.cdf((fstar-fhat)/obj_sigma) + obj_sigma*norm.pdf((fstar-fhat)/obj_sigma) const = const + cons_addition EI = np.log(EI) if EI == -np.inf: EI = -1000 if self.global_optimizer == 'ALPSO': # Adjust signs for optimizer (this is specific to ALPSO) signs = np.ones([1,len(cons)]) offset = np.zeros([1,len(cons)]) for ii,con in enumerate(cons): if cons[ii][1] == '>': signs[0,ii] = -1 offset[0,ii] = cons[ii][2] const = const*signs - offset*signs const = const.tolist()[0] print('Inputs') print(x) print('Obj') print(-EI) print('Con') print(const) return -EI,const,fail elif self.global_optimizer == 'SHGO': return -EI
[docs] def scale_vals(self,inp,con,ini,bnd,scl): """Scales values to help setup the problem Assumptions: N/A Source: N/A Inputs: inp [array] con [array] ini [array] bnd [array] scl [array] Outputs: tuple: x [array] scaled_constraints [array] x_low_bounds [array] x_up_bounds [array] con_up_edge [array] con_low_edge [array] Properties Used: N/A """ # Pull out the constraints and scale them bnd_constraints = help_fun.scale_const_bnds(con) scaled_constraints = help_fun.scale_const_values(con,bnd_constraints) x = ini/scl x_low_bound = [] x_up_bound = [] edge = [] con_up_edge = [] con_low_edge = [] for ii in range(0,len(inp)): x_low_bound.append(bnd[ii][0]/scl[ii]) x_up_bound.append(bnd[ii][1]/scl[ii]) for ii in range(0,len(con)): edge.append(scaled_constraints[ii]) if con[ii][1]=='<': con_up_edge.append(edge[ii]) con_low_edge.append(-np.inf) elif con[ii][1]=='>': con_up_edge.append(np.inf) con_low_edge.append(edge[ii]) elif con[ii][1]=='=': con_up_edge.append(edge[ii]) con_low_edge.append(edge[ii]) x_low_bound = np.array(x_low_bound) x_up_bound = np.array(x_up_bound) con_up_edge = np.array(con_up_edge) con_low_edge = np.array(con_low_edge) return (x,scaled_constraints,x_low_bound,x_up_bound,con_up_edge,con_low_edge)
[docs] def initialize_opt_vals(self,opt_prob,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval): """Sets up the optimization values Assumptions: N/A Source: N/A Inputs: opt_prob [pyopt_problem()] obj [float] inp [array] x_low_bound [array] x_up_bound [array] con_low_edge [array] con_up_edge [array] nam [list of str] con [array] x_eval [array] Outputs: N/A Properties Used: N/A """ for ii in range(len(obj)): opt_prob.addObj('f',100) for ii in range(0,len(inp)): vartype = 'c' if x_eval is None: opt_prob.addVar(nam[ii],vartype,lower=x_low_bound[ii],upper=x_up_bound[ii]) else: opt_prob.addVar(nam[ii],vartype,lower=x_low_bound[ii],upper=x_up_bound[ii],value=x_eval[ii]) for ii in range(0,len(con)): if con[ii][1]=='<': opt_prob.addCon(nam[ii], type='i', upper=con_up_edge[ii]) elif con[ii][1]=='>': opt_prob.addCon(nam[ii], type='i', lower=con_low_edge[ii],upper=np.inf) elif con[ii][1]=='=': opt_prob.addCon(nam[ii], type='e', equal=con_up_edge[ii]) return
[docs] def unpack_constraints_slsqp(self,x,con_ind,sign,edge,problem,cons_surrogate): if np.any(np.isnan(x)): return np.inf const = problem.all_constraints(x).tolist() cons_addition = cons_surrogate.predict(np.atleast_2d(x)) const = const + cons_addition const_list = const.tolist()[0] con = (const_list[con_ind]-edge)*sign return con
[docs] def initialize_opt_vals_SLSQP(self,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,x_eval,problem,cons_surr): # Initialize variables according to SLSQP requirements x0 = x_eval slsqp_con_list = [] for i,c in enumerate(con): c_dict = {} if c[1] == '<' or c[1] == '>': c_dict['type'] = 'ineq' elif c[1] == '=': c_dict['type'] = 'eq' else: raise ValueError('Constraint specification not recognized.') c_dict['fun'] = self.unpack_constraints_slsqp if c[1] == '<': c_dict['args'] = [i,-1,c[2],problem,cons_surr] else: c_dict['args'] = [i,1,c[2],problem,cons_surr] slsqp_con_list.append(c_dict) return x0,slsqp_con_list
[docs] def initialize_opt_vals_SHGO(self,obj,inp,x_low_bound,x_up_bound,con_low_edge,con_up_edge,nam,con,problem,cons_surr): # Initialize variables according to SLSQP requirements bound_list = [] for i in range(len(x_low_bound)): bound_list.append((x_low_bound[i],x_up_bound[i])) slsqp_con_list = [] for i,c in enumerate(con): c_dict = {} if c[1] == '<' or c[1] == '>': c_dict['type'] = 'ineq' elif c[1] == '=': c_dict['type'] = 'eq' else: raise ValueError('Constraint specification not recognized.') c_dict['fun'] = self.unpack_constraints_slsqp if c[1] == '<': c_dict['args'] = [i,-1,c[2],problem,cons_surr] else: c_dict['args'] = [i,1,c[2],problem,cons_surr] slsqp_con_list.append(c_dict) return bound_list,slsqp_con_list
[docs] def run_objective_optimization(self,opt_prob,problem,f_additive_surrogate,g_additive_surrogate): """Runs SNOPT to optimize Assumptions: N/A Source: N/A Inputs: opt_prob [pyopt_problem()] problem [nexus()] f_additive_surrogate [fun()] g_additive_surrogate [fun()] Outputs: fOpt [float] xOpt [array] Properties Used: N/A """ opt = pyOpt.pySNOPT.SNOPT() problem.fidelity_level = 1 outputs = opt(opt_prob, sens_type='FD',problem=problem, \ obj_surrogate=f_additive_surrogate,cons_surrogate=g_additive_surrogate)#, sens_step = sense_step) fOpt = outputs[0][0] xOpt = outputs[1] return fOpt, xOpt