matlanticgrrm.optimize のソースコード

import pstats
from xml.dom.pulldom import PROCESSING_INSTRUCTION
import ase
from ase import Atoms
from ase.neb import NEB
from ase.io import Trajectory
from ase.optimize import BFGS, LBFGS,LBFGSLineSearch,BFGSLineSearch,FIRE
# from sella import Sella, Constraints

from math import sqrt
from pathlib import Path
import pickle
from logging import getLogger,StreamHandler,FileHandler,Formatter,DEBUG,INFO,WARNING,ERROR,CRITICAL
import numpy as np
from tqdm.notebook import tqdm
from timeout_decorator import timeout, TimeoutError
import sys,os,pathlib,logging,math

from .log import reset_logger, add_filehandler
from .neb import ListNEB

[ドキュメント]class ListOptimizer(): """全てのOptimizerの親クラス Parameters: atoms_list: list of Atoms Atomsのリスト, NEB計算の時はListNEBオブジェクト savename: str | 計算実行(run)中に`{savename}.trajと{savename}.pickleが逐次作成される. | trajファイルには計算後の構造,pickleファイルには収束,未収束などの計算結果の情報が記録される. logfolder: str or path object ASEのlogfile引数に相当するが,ASEとは異なり,複数のlogファイルが作成されるためフォルダ名を指定する trajfolder: str or path object ASEのtrajectory引数に相当するが,ASEとは異なり,複数のtrajファイルが作成されるためフォルダ名を指定する errorlog: str or path object | 計算実行(run)中にerrorlogファイルが作成される. | 内容はsavenameの pickleファイルと似ているがテキストファイルで出力される. indexes: list of integers 特定の要素のみを計算したい場合にindex番号のリストで計算対象を指定する. max_fmax: float 初期構造のforceがmax_fmaxを上回った際には計算を実行しない. 特にNEB計算の際には役立つ. timeout: integer 1回のイタレーションにtimeout(秒)以上かかる際に計算を途中で諦める. ListSellaを使ったTS計算の際に特に役立つ maxstep: float | 1回のイタレーションで動く原子の最大値. VASPのPOTIMに相当. | MatlanticGrrmでなくASEで用意されている引数. """ def __init__(self,atoms_list:list,savename:str,logfolder:str,trajfolder:str,errorlog:str,indexes:list=None, max_fmax:float=10000,timeout:int=60*20,#ここまでがユーザー定義の引数 restart=None,maxstep=None,master=None,force_consistent=None): ##user定義## self.atoms_list = atoms_list self.savename = savename # 計算(run)すると{sefl.savename}.trajと{sefl.savename}.pickleが作成される self.logfolder = logfolder self.trajfolder = trajfolder self.errorlog = errorlog self.indexes = indexes self.max_fmax = max_fmax self.timeout = timeout self.state_list = [] # pickleファイルに保存するリスト ##ase由来## self.restart = restart self.maxstep = maxstep self.master = master self.force_consistent = force_consistent if type(self.atoms_list)==ListNEB: """NEB計算を行なう場合trajに保存すべきは,ListNEN.imagess""" self.save_datas = self.atoms_list.imagess else: """それ以外の計算ではAtomsリストをtrajに保存""" self.save_datas = self.atoms_list if self.indexes is None: self.indexes = [i for i in range(len(self.atoms_list))] if self.logfolder is not None: self.logfolder = Path(logfolder) if not self.logfolder.exists(): os.makedirs(self.logfolder) self.logfiles = [self.logfolder/f"{i}.log" for i in range(len(self.atoms_list))] else: self.logfiles = [None for _ in range(len(self.atoms_list))] if self.trajfolder: self.trajfolder = Path(trajfolder) if not self.trajfolder.exists(): os.makedirs(self.trajfolder) self.trajectorys = [self.trajfolder/f"{i}.traj" for i in range(len(self.atoms_list))] self.trajectorys = [str(path) for path in self.trajectorys] else: self.trajectorys = [None for _ in range(len(self.atoms_list))] def start_number(self): """何番目から計算を開始するか stop.logが存在する時はstpo.logに書かれている番号から計算を開始する """ self.logger = reset_logger() #loggerをリセット if Path("stop.log").exists(): with open("stop.log","r") as f: n = int(f.read()) #n番目から計算を始める os.remove("stop.log") add_filehandler(self.logger,"a",self.errorlog,DEBUG) else: n = 0 add_filehandler(self.logger,"w",self.errorlog,DEBUG) return n def write_stop_log(self,i): with open("stop.log","w") as f: f.write(str(i)) def ase_run(self,atoms,i,fmax,steps): opt = self.mk_opt_func(atoms,self.logfiles[i],self.trajectorys[i]) @timeout(self.timeout) def _internal(opt): opt.step() def irun(opt): # compute initial structure and log the first step opt.atoms.get_forces() # yield the first time to inspect before logging yield False if opt.nsteps == 0: opt.log() opt.call_observers() # run the algorithm until converged or max_steps reached while not opt.converged() and opt.nsteps < opt.max_steps: # compute the next step try: _internal(opt) except TimeoutError: self.dnf = True break opt.nsteps += 1 # let the user inspect the step and change things before logging # and predicting the next step yield False # log the step opt.log() opt.call_observers() # finally check if algorithm was converged yield opt.converged() opt.fmax = fmax if steps: opt.max_steps = steps for converged in irun(opt): pass return opt def save_object(self,atoms,mode): if type(self.atoms_list) == ListNEB: """NEBの時""" if not atoms: # objectが存在しない時 atoms = [Atoms() for _ in range(self.atoms_list.n_images)] n_images = len(atoms) for i,atm in enumerate(atoms): if mode == "w" and i == 0: Trajectory(f"{self.savename}.traj", mode="w", atoms=atm).write() else: Trajectory(f"{self.savename}.traj", mode="a", atoms=atm).write() else: """それ以外""" n_images = 1 if not atoms: atoms = Atoms() Trajectory(f"{self.savename}.traj", mode=mode, atoms=atoms).write() with open(f"{self.savename}.pickle","wb") as f: pickle.dump((self.state_list, n_images), f) def write_log(self,i,n,*comment): self.state_list.append(n) if n == 0: self.logger.debug(f"{i}: 完了 force={comment[0]} state {n}") elif n == 1: self.logger.info(f"{i}: index未指定 state {n}") elif n == 2: self.logger.info(f"{i}: objectが存在しない state {n}") elif n == 3: self.logger.warning(f"{i}: np.linalg.LinAlgError force={comment[0]} state {n}") elif n == 4: self.logger.warning(f"{i}: 未収束 force={comment[0]} state {n}") elif n == 5: self.logger.warning(f"{i}: TimeOut force={comment[0]} state {n}") elif n == 6: self.logger.warning(f"{i}: forceがmax_fmaxを上回っている force={comment[0]} state {n}") elif n == 7: self.logger.error(f"{i}: force計算不可 state {n}") elif n == 8: self.logger.critical(f"{i}: {comment[0]} {comment[1]} {comment[2]} state {n}") def initialize(self,i,atoms): """計算する前に構造やオブジェクトの有無を確認する param: i: int i番目の計算 atoms: Atoms or NEB i番目のAtomsオブジェクト. NEB計算の時はNEBオブジェクト Return: 1つでもひっかればFalseを返す """ self.dnf = False # タイムアウトした場合True. Do not finished if not i in self.indexes: """indexesに与えられてない時""" self.write_log(i,1) return False elif not atoms: """Atoms or NEBオブジェクトが存在しない(None)の時""" self.write_log(i,2) return False try: force = sqrt((atoms.get_forces()**2).sum(axis=1).max()) if force > self.max_fmax: """force > max_fmaxの時""" self.write_log(i,6,force) return False elif math.isnan(sqrt((atoms.get_forces()**2).sum(axis=1).max())): """force="nan"の時""" self.write_log(i,7) return False else: return True except: self.write_log(i,7) return False
[ドキュメント] def run(self,fmax=0.05, steps=100000000): """計算を実行する Parameters: fmax: 収束条件となるforce steps: integer 最大ステップ数 """ n = self.start_number() #n番目から計算を始める for i, atoms in enumerate(tqdm(self.atoms_list[n:]),start=n): if Path("stop.log").exists(): self.write_stop_log(i) break if self.initialize(i,atoms): try: opt = self.ase_run(atoms,i,fmax,steps) force = round(sqrt((atoms.get_forces()**2).sum(axis=1).max()),4) if opt.converged(): self.write_log(i,0,force) else: if self.dnf: self.write_log(i,5,force) else: self.write_log(i,4,force) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] self.write_log(i,8,exc_type,fname,exc_tb.tb_lineno) # trajファイルに保存する if i == 0 and n == 0: #始めは上書き保存 self.save_object(self.save_datas[i],mode="w") else: self.save_object(self.save_datas[i],mode="a")
[ドキュメント]class ListBFGS(ListOptimizer): def __init__(self, atoms_list, savename, logfolder=None, errorlog="error.log", trajfolder=None, indexes=None,max_fmax=100000,timeout=60*20, restart=None, maxstep=None, master=None, alpha=None): super().__init__(atoms_list,savename,logfolder,trajfolder,errorlog,indexes,max_fmax,timeout, restart,maxstep,master) self.alpha = alpha def mk_opt_func(self,atoms,logfile,trajectory): return BFGS(atoms,restart=self.restart,logfile=logfile,trajectory=trajectory, maxstep=self.maxstep,master=self.master,alpha=self.alpha)
[ドキュメント]class ListLBFGS(ListOptimizer): """ase.optimize.LBFGSをase.Atomsのリストを引数に使用できるようにしたもの. """ def __init__(self, atoms_list, savename, logfolder=None, errorlog="error.log", trajfolder=None, indexes=None,max_fmax=100000,timeout=60*20, restart=None, maxstep=None, memory=100, damping=1.0, alpha=70.0,use_line_search=False, master=None,force_consistent=None): super().__init__(atoms_list,savename,logfolder,trajfolder,errorlog,indexes,max_fmax,timeout, restart,maxstep,master) self.memory = memory self.damping = damping self.alpha = alpha self.use_line_search = use_line_search self.force_consistent = force_consistent def mk_opt_func(self,atoms,logfile,trajectory): return LBFGS(atoms,restart=self.restart,logfile=logfile,trajectory=trajectory, maxstep=self.maxstep,memory=self.memory,damping=self.damping, alpha=self.alpha,use_line_search=self.use_line_search,master=self.master, force_consistent=self.force_consistent)
[ドキュメント]class ListBFGSLineSearch(ListLBFGS): def __init__(self, atoms_list, savename, logfolder=None, errorlog="error.log", trajfolder=None, indexes=None, max_fmax=100000,timeout=60*20, restart=None, maxstep=None, c1=0.23, c2=0.46, alpha=10.0, stpmax=50.0, master=None, force_consistent=None): super().__init__(atoms_list,savename,logfolder,trajfolder,errorlog,indexes,max_fmax,timeout, restart,maxstep,master,force_consistent=force_consistent) self.c1 = c1 self.c2 = c2 self.alpha = alpha self.stpmax = stpmax def mk_opt_func(self,atoms,logfile,trajectory): return BFGSLineSearch(atoms, restart=self.restart, logfile=logfile, maxstep=self.maxstep, trajectory=trajectory, c1=self.c1, c2=self.c2, alpha=self.alpha, stpmax=self.stpmax, master=self.master, force_consistent=self.force_consistent)
[ドキュメント]class ListLBFGSLineSearch(ListLBFGS): def __init__(self, atoms_list, savename, logfolder=None, errorlog="error.log", trajfolder=None, indexes=None, max_fmax=100000, timeout=60*20, restart=None,maxstep=None, memory=100, damping=1.0, alpha=70.0, use_line_search=False, master=None, force_consistent=None): super().__init__(atoms_list,savename,logfolder,trajfolder,errorlog,indexes,max_fmax,timeout, restart,maxstep,master,force_consistent=force_consistent) self.memory = memory self.damping = damping self.alpha = alpha self.use_line_search = use_line_search def mk_opt_func(self,atoms,logfile,trajectory): return LBFGSLineSearch(atoms=atoms, restart=self.restart, logfile=logfile, trajectory=trajectory, maxstep=self.maxstep,memory=self.memory, damping=self.damping, alpha=self.alpha,use_line_search=self.use_line_search, master=self.master, force_consistent=self.force_consistent)
[ドキュメント]class ListFIRE(ListOptimizer): def __init__(self, atoms_list, savename, logfolder=None, errorlog="error.log", trajfolder=None, indexes=None, max_fmax=100000, timeout=60*20, restart=None, dt=0.1, maxstep=None, maxmove=None, dtmax=1.0, Nmin=5, finc=1.1, fdec=0.5, astart=0.1, fa=0.99, a=0.1, master=None, downhill_check=False, position_reset_callback=None, force_consistent=None): super().__init__(atoms_list,savename,logfolder,trajfolder,errorlog,indexes,max_fmax,timeout, restart,maxstep,master,force_consistent=force_consistent) self.dt = dt self.maxmove = maxmove self.dtmax = dtmax self.Nmin = Nmin self.finc = finc self.fdec = fdec self.astart = astart self.fa = fa self.a = a self.downhill_check = downhill_check self.position_reset_callback = position_reset_callback def mk_opt_func(self,atoms,logfile,trajectory): return FIRE(atoms=atoms,restart=self.restart,logfile=logfile,trajectory=trajectory, dt=self.dt,maxstep=self.maxstep,maxmove=self.maxmove,dtmax=self.dtmax, Nmin=self.Nmin,finc=self.finc,fdec=self.fdec,astart=self.astart,fa=self.fa, a=self.a,master=self.master,downhill_check=self.downhill_check, position_reset_callback=self.position_reset_callback, force_consistent=self.force_consistent)
[ドキュメント]class ListSella(ListOptimizer): """ :sella:`sella.Sella <optimize/optimize.py>` の第一引数にAtomsリストを使用できるように拡張したクラス Parameters: retry: integer | Sellaの計算ではnumpy.linalg.LinAlgErrorで計算が終了する場合が多いが, 多くの場合,再計算を行なうことで問題が解決する. | retry(回)まではLinAlgErrorが発生しても再計算を行なうようにする. その他の引数は :class:`ListBFGS` と :sella:`sella.Sella <optimize/optimize.py>` を参照 """ def __init__(self,atoms_list, savename, logfolder=None, errorlog="error.log", trajfolder=None, indexes=None, max_fmax=100000, timeout=60*20, retry=0, restart=None, master=None, force_consistent=False, delta0=None, sigma_inc=None, sigma_dec=None, rho_dec=None, rho_inc=None,order=1, eig=None, eta=1e-4, method=None, gamma=0.1, threepoint=False, constraints=None, constraints_tol=1e-5, v0=None, internal=False, append_trajectory=False, rs=None, nsteps_per_diag=3): super().__init__(atoms_list,savename,logfolder,trajfolder,errorlog,indexes,max_fmax,timeout, restart,master,force_consistent=force_consistent) self.retry = retry #最大のエラーの回数 self.error_count = 0 #エラーをカウント self.delta0 = delta0 self.sigma_inc = sigma_inc self.sigma_dec = sigma_dec self.rho_dec = rho_dec self.rho_inc = rho_inc self.order = order self.eig = eig self.eta = eta self.method = method self.gamma = gamma self.threepoint = threepoint self.constraints = constraints self.constraints_tol = constraints_tol self.v0 = v0 self.internal = internal self.append_trajectory = append_trajectory self.rs = rs self.nsteps_per_diag = nsteps_per_diag def mk_opt_func(self,atoms,logfile,trajectory): return Sella(atoms=atoms,restart=self.restart,logfile=logfile,trajectory=trajectory, master=self.master,force_consistent=self.force_consistent,delta0=self.delta0, sigma_inc=self.sigma_inc,sigma_dec=self.sigma_dec,rho_dec=self.rho_dec, rho_inc=self.rho_inc,order=self.order,eig=self.eig,eta=self.eta,method=self.method, gamma=self.gamma,threepoint=self.threepoint,constraints=self.constraints, constraints_tol=self.constraints_tol,v0=self.v0,internal=self.internal, append_trajectory=self.append_trajectory,rs=self.rs,nsteps_per_diag=self.nsteps_per_diag)
[ドキュメント] def run(self,fmax=0.05, steps=100000000): """計算を実行する Parameters: fmax: 収束条件となるforce steps: integer 最大ステップ数 """ n = self.start_number() # n番目から計算を始める i = n bar = tqdm(total=len(self.atoms_list)) while i < len(self.atoms_list): if Path("stop.log").exists(): self.write_stop_log(i) break atoms = self.atoms_list[i] if self.initialize(i,atoms): try: opt = self.ase_run(atoms,i,fmax,steps) force = round(sqrt((atoms.get_forces()**2).sum(axis=1).max()),4) if opt.converged(): self.write_log(i,0,force) else: if self.dnf: #timeoutの場合 if self.error_count < self.retry and opt.nsteps > 1: self.error_count += 1 self.logger.info(f"{i}: TimeOut {self.error_count}回目の再計算を開始する") continue else: self.write_log(i,5,force) else: self.write_log(i,4,force) except np.linalg.LinAlgError as e: if self.error_count < self.retry: self.error_count += 1 self.logger.info(f"{i}: numpyエラー {self.error_count}回目の再計算を開始する") continue else: force = round(sqrt((atoms.get_forces()**2).sum(axis=1).max()),4) self.write_log(self,i,3,force) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] self.write_log(i,8,exc_type,fname,exc_tb.tb_lineno) # trajファイルに保存する if i == 0 and n == 0: #始めは上書き保存 self.save_object(self.save_datas[i],mode="w") else: self.save_object(self.save_datas[i],mode="a") i += 1 self.error_count = 0 bar.update(1)