Source code for mpinterfaces.utils

# coding: utf-8
# Copyright (c) Henniggroup.
# Distributed under the terms of the MIT License.

from __future__ import division, print_function, unicode_literals, \
    absolute_import


"""
Utility functions
"""

from six.moves import range, zip

import itertools as it
from functools import reduce
import linecache
import sys
import os
import math
import socket
import time
import subprocess as sp
import logging
from collections import OrderedDict
import yaml

import numpy as np

from monty.json import MontyEncoder, MontyDecoder
from monty.serialization import loadfn, dumpfn

from pymatgen.core.sites import PeriodicSite
from pymatgen import Structure, Lattice, Element
from pymatgen.core.surface import Slab, SlabGenerator
from pymatgen.io.ase import AseAtomsAdaptor
from pymatgen.io.vasp.inputs import Poscar
from pymatgen.core.composition import Composition
from pymatgen.core.operations import SymmOp
from pymatgen.io.vasp.outputs import Vasprun
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer

from custodian.custodian import Custodian

from fireworks.user_objects.queue_adapters.common_adapter import CommonAdapter

from ase.lattice.surface import surface

from mpinterfaces.default_logger import get_default_logger
from mpinterfaces import VASP_STD_BIN, QUEUE_SYSTEM, QUEUE_TEMPLATE, VASP_PSP,\
 PACKAGE_PATH

__author__ = "Kiran Mathew, Joshua J. Gabriel, Michael Ashton"
__copyright__ = "Copyright 2017, Henniggroup"
__maintainer__ = "Joshua J. Gabriel"
__email__ = "joshgabriel92@gmail.com"
__status__ = "Production"
__date__ = "March 3, 2017"

logger = get_default_logger(__name__)


[docs]def get_ase_slab(pmg_struct, hkl=(1, 1, 1), min_thick=10, min_vac=10): """ takes in the intial structure as pymatgen Structure object uses ase to generate the slab returns pymatgen Slab object Args: pmg_struct: pymatgen structure object hkl: hkl index of surface of slab to be created min_thick: minimum thickness of slab in Angstroms min_vac: minimum vacuum spacing """ ase_atoms = AseAtomsAdaptor().get_atoms(pmg_struct) pmg_slab_gen = SlabGenerator(pmg_struct, hkl, min_thick, min_vac) h = pmg_slab_gen._proj_height nlayers = int(math.ceil(pmg_slab_gen.min_slab_size / h)) ase_slab = surface(ase_atoms, hkl, nlayers) ase_slab.center(vacuum=min_vac / 2, axis=2) pmg_slab_structure = AseAtomsAdaptor().get_structure(ase_slab) return Slab(lattice=pmg_slab_structure.lattice, species=pmg_slab_structure.species_and_occu, coords=pmg_slab_structure.frac_coords, site_properties=pmg_slab_structure.site_properties, miller_index=hkl, oriented_unit_cell=pmg_slab_structure, shift=0., scale_factor=None, energy=None)
[docs]def slab_from_file(hkl, filename): """ reads in structure from the file and returns slab object. useful for reading in 2d/substrate structures from file. Args: hkl: miller index of the slab in the input file. filename: structure file in any format supported by pymatgen Returns: Slab object """ slab_input = Structure.from_file(filename) return Slab(slab_input.lattice, slab_input.species_and_occu, slab_input.frac_coords, hkl, Structure.from_sites(slab_input, to_unit_cell=True), shift=0, scale_factor=np.eye(3, dtype=np.int), site_properties=slab_input.site_properties)
[docs]def get_magmom_string(structure): """ Based on a POSCAR, returns the string required for the MAGMOM setting in the INCAR. Initializes transition metals with 6.0 bohr magneton and all others with 0.5. TEST: integration of mat2d function with mpinterfaces calibrate.py Consider moving to mpinterfaces.utils Args: structure (Structure): Pymatgen Structure object Returns: string with INCAR setting for MAGMOM according to mat2d database calculations """ magmoms, considered = [], [] for s in structure.sites: if s.specie not in considered: if s.specie.is_transition_metal: magmoms.append('{}*6.0'.format(structure.composition[s.specie])) else: magmoms.append('{}*0.5'.format(structure.composition[s.specie])) considered.append(s.specie) return ' '.join(magmoms)
[docs]def get_magmom_mae(poscar, mag_init): """ mae """ mae_magmom = [] sites_dict = poscar.as_dict()['structure']['sites'] # initialize a magnetic moment on the transition metal # in vector form on the x-direction for n, s in enumerate(sites_dict): if Element(s['label']).is_transition_metal: mae_magmom.append([0.0, 0.0, mag_init]) else: mae_magmom.append([0.0, 0.0, 0.0]) return sum(mae_magmom, [])
[docs]def get_magmom_afm(poscar, database=None): """ returns the magmom string which is an N length list """ afm_magmom = [] orig_structure_name = poscar.comment if len(poscar.structure) % 2 != 0: if database == 'twod': # no need for more vacuum spacing poscar.structure.make_supercell([2, 2, 1]) else: # for bulk structure poscar.structure.make_supercell([2, 2, 2]) sites_dict = poscar.as_dict()['structure']['sites'] for n, s in enumerate(sites_dict): if Element(s['label']).is_transition_metal: if n % 2 == 0: afm_magmom.append(6.0) else: afm_magmom.append(-6.0) else: if n % 2 == 0: afm_magmom.append(0.5) else: afm_magmom.append(-0.5) return afm_magmom, Poscar(structure=poscar.structure, comment=orig_structure_name)
[docs]def get_run_cmmnd(nnodes=1, ntasks=16, walltime='10:00:00', job_bin=None, job_name=None, mem=None): """ returns the fireworks CommonAdapter based on the queue system specified by mpint_config.yaml and the submit file template also specified in mpint_config.yaml NOTE: for the job_bin, please specify the mpi command as well: Eg: mpiexec /path/to/binary """ d = {} job_cmd = None qtemp_file = open(QUEUE_TEMPLATE+'qtemplate.yaml') qtemp = yaml.load(qtemp_file) qtemp_file.close() qtemp.update({'nodes': nnodes, 'ntasks':ntasks, 'walltime': walltime, \ 'rocket_launch': job_bin, 'job_name':job_name,'mem':mem}) # SLURM queue if QUEUE_SYSTEM == 'slurm': if job_bin is None: job_bin = VASP_STD_BIN else: job_bin = job_bin d = {'type': 'SLURM', 'params': qtemp} # PBS queue elif QUEUE_SYSTEM == 'pbs': if job_bin is None: job_bin = VASP_STD_BIN else: job_bin = job_bin d = {'type': 'PBS', 'params': qtemp} else: job_cmd = ['ls', '-lt'] if d: #print (CommonAdapter(d['type'], **d['params']), job_cmd) return (CommonAdapter(d['type'], **d['params']), job_cmd) else: return (None, job_cmd)
[docs]def get_job_state(job): """ Args: job: job Returns: the job state and the job output file name """ ofname = None # pbs if QUEUE_SYSTEM == 'pbs':# in hostname: try: output = sp.check_output(['qstat', '-i', job.job_id]) state = output.rstrip('\n').split('\n')[-1].split()[-2] except: logger.info('Job {} not in the que'.format(job.job_id)) state = "00" ofname = "FW_job.out" # slurm elif QUEUE_SYSTEM == 'slurm': try: output = sp.check_output(['squeue', '--job', job.job_id]) state = output.rstrip('\n').split('\n')[-1].split()[-4] except: logger.info('Job {} not in the que.'.format(job.job_id)) logger.info( 'This could mean either the batchsystem crashed(highly unlikely) or the job completed a long time ago') state = "00" ofname = "vasp_job-" + str(job.job_id) + ".out" # no batch system else: state = 'XX' return state, ofname
[docs]def update_checkpoint(job_ids=None, jfile=None, **kwargs): """ rerun the jobs with job ids in the job_ids list. The jobs are read from the json checkpoint file, jfile. If no job_ids are given then the checkpoint file will be updated with corresponding final energy Args: job_ids: list of job ids to update or q resolve jfile: check point file """ cal_log = loadfn(jfile, cls=MontyDecoder) cal_log_new = [] all_jobs = [] run_jobs = [] handlers = [] final_energy = None incar = None kpoints = None qadapter = None # if updating the specs of the job for k, v in kwargs.items(): if k == 'incar': incar = v if k == 'kpoints': kpoints = v if k == 'que': qadapter = v for j in cal_log: job = j["job"] job.job_id = j['job_id'] all_jobs.append(job) if job_ids and (j['job_id'] in job_ids or job.job_dir in job_ids): logger.info('setting job {0} in {1} to rerun'.format(j['job_id'], job.job_dir)) contcar_file = job.job_dir + os.sep + 'CONTCAR' poscar_file = job.job_dir + os.sep + 'POSCAR' if os.path.isfile(contcar_file) and len( open(contcar_file).readlines()) != 0: logger.info('setting poscar file from {}' .format(contcar_file)) job.vis.poscar = Poscar.from_file(contcar_file) else: logger.info('setting poscar file from {}' .format(poscar_file)) job.vis.poscar = Poscar.from_file(poscar_file) if incar: logger.info('incar overridden') job.vis.incar = incar if kpoints: logger.info('kpoints overridden') job.vis.kpoints = kpoints if qadapter: logger.info('qadapter overridden') job.vis.qadapter = qadapter run_jobs.append(job) if run_jobs: c = Custodian(handlers, run_jobs, max_errors=5) c.run() for j in all_jobs: final_energy = j.get_final_energy() cal_log_new.append({"job": j.as_dict(), 'job_id': j.job_id, "corrections": [], 'final_energy': final_energy}) dumpfn(cal_log_new, jfile, cls=MontyEncoder, indent=4)
[docs]def jobs_from_file(filename='calibrate.json'): """ read in json file of format caibrate.json(the default logfile created when jobs are run through calibrate) and return the list of job objects. Args: filename: checkpoint file name Returns: list of all jobs """ caljobs = loadfn(filename, cls=MontyDecoder) all_jobs = [] for j in caljobs: job = j["job"] job.job_id = j['job_id'] job.final_energy = j['final_energy'] all_jobs.append(job) return all_jobs
[docs]def launch_daemon(steps, interval, handlers=None, ld_logger=None): """ run all the 'steps' in daemon mode checks job status every 'interval' seconds also runs all the error handlers """ if ld_logger: global logger logger = ld_logger chkpt_files_prev = None for step in steps: chkpt_files = step(checkpoint_files=chkpt_files_prev) chkpt_files_prev = chkpt_files if not chkpt_files: return None while True: done = [] reruns = [] for cf in chkpt_files: time.sleep(3) update_checkpoint(job_ids=reruns, jfile=cf) all_jobs = jobs_from_file(cf) for j in all_jobs: state, ofname = get_job_state(j) if j.final_energy: done = done + [True] elif state == 'R': logger.info('job {} running'.format(j.job_id)) done = done + [False] elif state in ['C', 'CF', 'F', '00']: logger.error( 'Job {0} in {1} cancelled or failed. State = {2}'. format(j.job_id, j.job_dir, state)) done = done + [False] if handlers: logger.info('Investigating ... ') os.chdir(j.job_dir) if ofname: if os.path.exists(ofname): for h in handlers: h.output_filename = ofname if h.check(): logger.error( 'Detected vasp errors {}'.format( h.errors)) # TODO: correct the error and mark the job for rerun # all error handling must done using proper errorhandlers # h.correct() # reruns.append(j.job_id) else: logger.error( 'stdout redirect file not generated, job {} will be rerun'.format( j.job_id)) reruns.append(j.job_id) os.chdir(j.parent_job_dir) else: logger.info( 'Job {0} pending. State = {1}'.format(j.job_id, state)) done = done + [False] if all(done): logger.info( 'all jobs in {} done. Proceeding to the next one'.format( step.__name__)) time.sleep(5) break logger.info( 'all jobs in {0} NOT done. Next update in {1} seconds'.format( step.__name__, interval)) time.sleep(interval)
[docs]def get_convergence_data(jfile, params=('ENCUT', 'KPOINTS')): """ returns data dict in the following format {'Al': {'ENCUT': [ [500,1.232], [600,0.8798] ], 'KPOINTS':[ [], [] ] }, 'W': ... } Note: processes only INCAR parmaters and KPOINTS """ cutoff_jobs = jobs_from_file(jfile) data = {} for j in cutoff_jobs: jdir = os.path.join(j.parent_job_dir, j.job_dir) poscar_file = os.path.join(jdir, 'POSCAR') struct_m = Structure.from_file(poscar_file) species = ''.join([tos.symbol for tos in struct_m.types_of_specie]) if data.get(species): for p in params: if j.vis.incar.get(p): data[species][p].append([j.vis.incar[p], j.final_energy / len(struct_m)]) elif p == 'KPOINTS': data[species]['KPOINTS'].append([j.vis.kpoints.kpts, j.final_energy / len( struct_m)]) else: logger.warn( 'dont know how to parse the parameter {}'.format(p)) else: data[species] = {} for p in params: data[species][p] = [] data[species][p] = [] return data
[docs]def get_opt_params(data, species, param='ENCUT', ev_per_atom=0.001): """ return optimum parameter default: 1 meV/atom """ sorted_list = sorted(data[species][param], key=lambda x: x[1]) sorted_array = np.array(sorted_list) consecutive_diff = np.abs( sorted_array[:-1, 1] - sorted_array[1:, 1] - ev_per_atom) min_index = np.argmin(consecutive_diff) return sorted_list[min_index][0]
# PLEASE DONT CHANGE THINGS WITHOUT UPDATING SCRIPTS/MODULES THAT DEPEND # ON IT # get_convergence_data and get_opt_params moved to *_custom
[docs]def get_convergence_data_custom(jfile, params=('ENCUT', 'KPOINTS')): """ returns data dict in the following format {'Al': {'ENCUT': [ [500,1.232], [600,0.8798] ], 'KPOINTS':[ [], [] ] }, 'W': ... } Note: processes only INCAR parmaters and KPOINTS Sufficient tagging of the data assumed from species,Poscar comment line and potcar functional """ cutoff_jobs = jobs_from_file(jfile) data = {} for j in cutoff_jobs: jdir = os.path.join(j.parent_job_dir, j.job_dir) poscar_file = os.path.join(jdir, 'POSCAR') struct_m = Structure.from_file(poscar_file) species = ''.join([tos.symbol for tos in struct_m.types_of_specie]) tag = '_'.join([species, Poscar.from_file(poscar_file).comment, j.vis.potcar.functional]) if data.get(tag): for p in params: if j.vis.incar.get(p): data[tag][p].append([j.vis.incar[p], j.final_energy / len(struct_m), j.vis.potcar, j.vis.poscar]) # print(j.vis.potcar.functional,j.vis.poscar) elif p == 'KPOINTS': data[tag]['KPOINTS'].append([j.vis.kpoints.kpts, j.final_energy / len( struct_m), j.vis.potcar, j.vis.poscar]) else: logger.warn( 'dont know how to parse the parameter {}'.format(p)) else: data[tag] = {} for p in params: data[tag][p] = [] data[tag][p] = [] return data
[docs]def get_opt_params_custom(data, tag, param='ENCUT', ev_per_atom=1.0): """ Args: data: dictionary of convergence data tag: key to dictionary of convergence dara param: parameter to be optimized ev_per_atom: minimizing criterion in eV per unit Returns [list] optimum parameter set consisting of tag, potcar object, poscar object, list of convergence data energies sorted according to param default criterion: 1 meV/atom """ sorted_list = sorted(data[tag][param], key=lambda x: x[0]) # sorted array data t = np.array(sorted_list)[:, 1] # print(sorted_array[:-1,1], sorted_array[1:,1], ev_per_atom) consecutive_diff = [float(j) - float(i) - ev_per_atom for i, j in zip(t[:-1], t[1:])] # print("Consecutive_diff",consecutive_diff) min_index = np.argmin(consecutive_diff) # return the tag,potcar object, poscar object, incar setting and # convergence data for plotting that is optimum return [tag, data[tag][param][min_index][2], data[tag][param][min_index][3], sorted_list[min_index][0], t]
[docs]def partition_jobs(turn_knobs, max_jobs): """ divide turn_knobs into smaller turn_knobs so that each one of them has smaller max_jobs jobs """ params_len = [len(v) for k, v in turn_knobs.items()] n_total_jobs = reduce(lambda x, y: x * y, params_len) partition_size = int(n_total_jobs / max_jobs) max_index = np.argmax(params_len) max_len = max(params_len) max_key = list(turn_knobs.items())[max_index][0] partition = range(0, max_len, max(1, int(max_len / partition_size))) partition_1 = partition[1:] + [max_len] logger.info( '{0} list of length {1} will be partitioned into {2} chunks'.format( max_key, max_len, len(partition))) turn_knobs_list = [] name_list = [] for i, j in zip(partition, partition_1): ordered_list = [] for k, v in turn_knobs.items(): if k == max_key: tk_item = (k, v[i:j]) else: tk_item = (k, v) ordered_list.append(tk_item) turn_knobs_list.append(OrderedDict(ordered_list)) name_list.append('_'.join([str(i), str(j)])) return turn_knobs_list, name_list
[docs]def get_logger(log_file_name): """ writes out logging file. Very useful project logging, recommended for use to monitor the start and completion of steps in the workflow Arg: log_file_name: name of the log file, log_file_name.log """ loggr = logging.getLogger(log_file_name) loggr.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(message)s') fh = logging.FileHandler(log_file_name + '.log', mode='a') fh.setFormatter(formatter) loggr.addHandler(fh) return loggr
[docs]def set_sd_flags(poscar_input=None, n_layers=2, top=True, bottom=True, poscar_output='POSCAR2'): """ set the relaxation flags for top and bottom layers of interface. The upper and lower bounds of the z coordinate are determined based on the slab. Args: poscar_input: input poscar file name n_layers: number of layers to be relaxed top: whether n_layers from top are be relaxed bottom: whether n_layers from bottom are be relaxed poscar_output: output poscar file name Returns: None writes the modified poscar file """ poscar1 = Poscar.from_file(poscar_input) sd_flags = np.zeros_like(poscar1.structure.frac_coords) z_coords = poscar1.structure.frac_coords[:, 2] z_lower_bound, z_upper_bound = None, None if bottom: z_lower_bound = np.unique(z_coords)[n_layers - 1] sd_flags[np.where(z_coords <= z_lower_bound)] = np.ones((1, 3)) if top: z_upper_bound = np.unique(z_coords)[-n_layers] sd_flags[np.where(z_coords >= z_upper_bound)] = np.ones((1, 3)) poscar2 = Poscar(poscar1.structure, selective_dynamics=sd_flags.tolist()) poscar2.write_file(filename=poscar_output)
[docs]def is_converged(directory): """ Check if a relaxation has converged. Args: directory (str): path to directory to check. Returns: boolean. Whether or not the job is converged. """ try: return Vasprun('{}/vasprun.xml'.format(directory)).converged except: return False
[docs]def get_spacing(structure): """ Returns the interlayer spacing for a 2D material or slab. Args: structure (Structure): Structure to check spacing for. cut (float): a fractional z-coordinate that must be within the vacuum region. Returns: float. Spacing in Angstroms. """ structure = align_axis(structure) structure = center_slab(structure) max_height = max([s.coords[2] for s in structure.sites]) min_height = min([s.coords[2] for s in structure.sites]) return structure.lattice.c - (max_height - min_height)
[docs]def center_slab(structure): """ Centers the atoms in a slab structure around 0.5 fractional height. Args: structure (Structure): Structure to center Returns: Centered Structure object. """ center = np.average([s._fcoords[2] for s in structure.sites]) translation = (0, 0, 0.5 - center) structure.translate_sites(range(len(structure.sites)), translation) return structure
[docs]def add_vacuum(structure, vacuum): """ Adds padding to a slab or 2D material. Args: structure (Structure): Structure to add vacuum to vacuum (float): Vacuum thickness to add in Angstroms Returns: Structure object with vacuum added. """ structure = align_axis(structure) coords = [s.coords for s in structure.sites] species = [s.specie for s in structure.sites] lattice = structure.lattice.matrix lattice[2][2] += vacuum structure = Structure(lattice, species, coords, coords_are_cartesian=True) return center_slab(structure)
[docs]def ensure_vacuum(structure, vacuum): """ Adds padding to a slab or 2D material until the desired amount of vacuum is reached. Args: structure (Structure): Structure to add vacuum to vacuum (float): Final desired vacuum thickness in Angstroms Returns: Structure object with vacuum added. """ structure = align_axis(structure) spacing = get_spacing(structure) structure = add_vacuum(structure, vacuum - spacing) return center_slab(structure)
[docs]def get_rotation_matrix(axis, theta): """ Find the rotation matrix associated with counterclockwise rotation about the given axis by theta radians. Credit: http://stackoverflow.com/users/190597/unutbu Args: axis (list): rotation axis of the form [x, y, z] theta (float): rotational angle in radians Returns: array. Rotation matrix. """ axis = np.array(list(axis)) axis = axis / np.linalg.norm(axis) axis *= -np.sin(theta/2.0) a = np.cos(theta/2.0) b, c, d = tuple(axis.tolist()) aa, bb, cc, dd = a*a, b*b, c*c, d*d bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d return np.array([[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)], [2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)], [2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]])
[docs]def align_axis(structure, axis='c', direction=(0, 0, 1)): """ Rotates a structure so that the specified axis is along the [001] direction. This is useful for adding vacuum, and in general for using vasp compiled with no z-axis relaxation. Args: structure (Structure): Pymatgen Structure object to rotate. axis: Axis to be rotated. Can be 'a', 'b', 'c', or a 1x3 vector. direction (vector): Final axis to be rotated to. Returns: structure. Rotated to align axis along direction. """ if axis == 'a': axis = structure.lattice._matrix[0] elif axis == 'b': axis = structure.lattice._matrix[1] elif axis == 'c': axis = structure.lattice._matrix[2] proj_axis = np.cross(axis, direction) if not(proj_axis[0] == 0 and proj_axis[1] == 0): theta = ( np.arccos(np.dot(axis, direction) / (np.linalg.norm(axis) * np.linalg.norm(direction))) ) R = get_rotation_matrix(proj_axis, theta) rotation = SymmOp.from_rotation_and_translation(rotation_matrix=R) structure.apply_operation(rotation) return structure
[docs]def get_structure_type(structure, write_poscar_from_cluster=False): """ This is a topology-scaling algorithm used to describe the periodicity of bonded clusters in a bulk structure. Args: structure (structure): Pymatgen structure object to classify. write_poscar_from_cluster (bool): Set to True to write a POSCAR from the sites in the cluster. Returns: string. 'molecular' (0D), 'chain', 'layered', 'heterogeneous' (intercalated 3D), or 'conventional' (3D) """ # The conventional standard structure is much easier to work # with. structure = SpacegroupAnalyzer(structure).get_conventional_standard_structure() # Noble gases don't have well-defined bonding radii. if not len([e for e in structure.composition if e.symbol in ['He', 'Ne', 'Ar', 'Kr', 'Xe']]) == 0: type = 'noble gas' else: if len(structure.sites) < 45: structure.make_supercell(2) # Create a dict of sites as keys and lists of their # bonded neighbors as values. sites = structure.sites bonds = {} for site in sites: bonds[site] = [] for i in range(len(sites)): site_1 = sites[i] for site_2 in sites[i+1:]: if (site_1.distance(site_2) < float(Element(site_1.specie).atomic_radius + Element(site_2.specie).atomic_radius) * 1.1): bonds[site_1].append(site_2) bonds[site_2].append(site_1) # Assimilate all bonded atoms in a cluster; terminate # when it stops growing. cluster_terminated = False while not cluster_terminated: original_cluster_size = len(bonds[sites[0]]) for site in bonds[sites[0]]: bonds[sites[0]] += [ s for s in bonds[site] if s not in bonds[sites[0]]] if len(bonds[sites[0]]) == original_cluster_size: cluster_terminated = True original_cluster = bonds[sites[0]] if len(bonds[sites[0]]) == 0: # i.e. the cluster is a single atom. type = 'molecular' elif len(bonds[sites[0]]) == len(sites): # i.e. all atoms are bonded. type = 'conventional' else: # If the cluster's composition is not equal to the # structure's overall composition, it is a heterogeneous # compound. cluster_composition_dict = {} for site in bonds[sites[0]]: if Element(site.specie) in cluster_composition_dict: cluster_composition_dict[Element(site.specie)] += 1 else: cluster_composition_dict[Element(site.specie)] = 1 uniform = True if len(cluster_composition_dict): cmp = Composition.from_dict(cluster_composition_dict) if cmp.reduced_formula != structure.composition.reduced_formula: uniform = False if not uniform: type = 'heterogeneous' else: # Make a 2x2x2 supercell and recalculate the # cluster's new size. If the new cluster size is # the same as the old size, it is a non-periodic # molecule. If it is 2x as big, it's a 1D chain. # If it's 4x as big, it is a layered material. old_cluster_size = len(bonds[sites[0]]) structure.make_supercell(2) sites = structure.sites bonds = {} for site in sites: bonds[site] = [] for i in range(len(sites)): site_1 = sites[i] for site_2 in sites[i+1:]: if (site_1.distance(site_2) < float(Element(site_1.specie).atomic_radius + Element(site_2.specie).atomic_radius) * 1.1): bonds[site_1].append(site_2) bonds[site_2].append(site_1) cluster_terminated = False while not cluster_terminated: original_cluster_size = len(bonds[sites[0]]) for site in bonds[sites[0]]: bonds[sites[0]] += [ s for s in bonds[site] if s not in bonds[sites[0]]] if len(bonds[sites[0]]) == original_cluster_size: cluster_terminated = True if len(bonds[sites[0]]) != 4 * old_cluster_size: type = 'molecular' else: type = 'layered' if write_poscar_from_cluster: Structure.from_sites(original_cluster).to('POSCAR', 'POSCAR') return type
[docs]def write_potcar(pot_path=VASP_PSP, types='None'): """ Writes a POTCAR file based on a list of types. Args: pot_path (str): can be changed to override default location of POTCAR files. types (list): list of same length as number of elements containing specifications for the kind of potential desired for each element, e.g. ['Na_pv', 'O_s']. If left as 'None', uses the defaults in the 'potcar_symbols.yaml' file in the package root. """ if pot_path == None: # This probably means the config.yaml file has not # been set up. pass else: poscar = open('POSCAR', 'r') lines = poscar.readlines() elements = lines[5].split() poscar.close() potcar_symbols = loadfn( os.path.join(PACKAGE_PATH, 'mat2d', 'potcar_symbols.yaml') ) if types == 'None': sorted_types = [potcar_symbols[elt] for elt in elements] else: sorted_types = [] for elt in elements: for t in types: if t.split('_')[0] == elt: sorted_types.append(t) potentials = [] for i in range(len(elements)): if types[i] == '': pass else: elements[i] += '_{}'.format(types[i]) # Create paths, open files, and write files to # POTCAR for each potential. for potential in sorted_types: potentials.append('{}/{}/POTCAR'.format(pot_path, potential)) outfile = open('POTCAR', 'w') for potential in potentials: infile = open(potential) for line in infile: outfile.write(line) infile.close() outfile.close()
[docs]def write_circle_mesh_kpoints(center=[0, 0, 0], radius=0.1, resolution=20): """ Create a circular mesh of k-points centered around a specific k-point and write it to the KPOINTS file. Non-circular meshes are not supported, but would be easy to code. All k-point weights are set to 1. Args: center (list): x, y, and z coordinates of mesh center. Defaults to Gamma. radius (float): Size of the mesh in inverse Angstroms. resolution (int): Number of mesh divisions along the radius in the 3 primary directions. """ kpoints = [] step = radius / resolution for i in range(-resolution, resolution): for j in range(-resolution, resolution): if i**2 + j**2 <= resolution**2: kpoints.append([str(center[0]+step*i), str(center[1]+step*j), '0', '1']) with open('KPOINTS', 'w') as kpts: kpts.write('KPOINTS\n{}\ndirect\n'.format(len(kpoints))) for kpt in kpoints: kpts.write(' '.join(kpt)) kpts.write('\n')
[docs]def get_markovian_path(points): """ Calculates the shortest path connecting an array of 2D points. Useful for sorting linemode k-points. Args: points (list): list/array of points of the format [[x_1, y_1, z_1], [x_2, y_2, z_2], ...] Returns: list: A sorted list of the points in order on the markovian path. """ def dist(x, y): return math.hypot(y[0] - x[0], y[1] - x[1]) paths = [p for p in it.permutations(points)] path_distances = [ sum(map(lambda x: dist(x[0], x[1]), zip(p[:-1], p[1:]))) for p in paths] min_index = np.argmin(path_distances) return paths[min_index]
[docs]def remove_z_kpoints(): """ Strips all linemode k-points from the KPOINTS file that include a z-component, since these are not relevant for 2D materials and slabs. """ kpoint_file = open('KPOINTS') kpoint_lines = kpoint_file.readlines() kpoint_file.close() twod_kpoints = [] labels = {} i = 4 while i < len(kpoint_lines): kpt_1 = kpoint_lines[i].split() kpt_2 = kpoint_lines[i+1].split() if float(kpt_1[2]) == 0.0 and [float(kpt_1[0]), float(kpt_1[1])] not in twod_kpoints: twod_kpoints.append([float(kpt_1[0]), float(kpt_1[1])]) labels[kpt_1[4]] = [float(kpt_1[0]), float(kpt_1[1])] if float(kpt_2[2]) == 0.0 and [float(kpt_2[0]), float(kpt_2[1])] not in twod_kpoints: twod_kpoints.append([float(kpt_2[0]), float(kpt_2[1])]) labels[kpt_2[4]] = [float(kpt_2[0]), float(kpt_2[1])] i += 3 kpath = get_markovian_path(twod_kpoints) with open('KPOINTS', 'w') as kpts: for line in kpoint_lines[:4]: kpts.write(line) for i in range(len(kpath)): label_1 = [l for l in labels if labels[l] == kpath[i]][0] if i == len(kpath) - 1: kpt_2 = kpath[0] label_2 = [l for l in labels if labels[l] == kpath[0]][0] else: kpt_2 = kpath[i+1] label_2 = [l for l in labels if labels[l] == kpath[i+1]][0] kpts.write(' '.join([str(kpath[i][0]), str(kpath[i][1]), '0.0 !', label_1])) kpts.write('\n') kpts.write(' '.join([str(kpt_2[0]), str(kpt_2[1]), '0.0 !', label_2])) kpts.write('\n\n') kpts.close()
[docs]def update_submission_template(default_template, qtemplate): """ helper function for writing a CommonAdapter template fireworks submission file based on a provided default_template which contains hpc resource allocation information and the qtemplate which is a yaml of commonly modified user arguments """ pass
[docs]def write_pbs_runjob(name, nnodes, nprocessors, pmem, walltime, binary): """ writes a runjob based on a name, nnodes, nprocessors, walltime, and binary. Designed for runjobs on the Hennig group_list on HiperGator 1 (PBS). Args: name (str): job name. nnodes (int): number of requested nodes. nprocessors (int): number of requested processors. pmem (str): requested memory including units, e.g. '1600mb'. walltime (str): requested wall time, hh:mm:ss e.g. '2:00:00'. binary (str): absolute path to binary to run. """ runjob = open('runjob', 'w') runjob.write('#!/bin/sh\n') runjob.write('#PBS -N {}\n'.format(name)) runjob.write('#PBS -o test.out\n') runjob.write('#PBS -e test.err\n') runjob.write('#PBS -r n\n') runjob.write('#PBS -l walltime={}\n'.format(walltime)) runjob.write('#PBS -l nodes={}:ppn={}\n'.format(nnodes, nprocessors)) runjob.write('#PBS -l pmem={}\n'.format(pmem)) runjob.write('#PBS -W group_list=hennig\n\n') runjob.write('cd $PBS_O_WORKDIR\n\n') runjob.write('mpirun {} > job.log\n\n'.format(binary)) runjob.write('echo \'Done.\'\n') runjob.close()
[docs]def write_slurm_runjob(name, ntasks, pmem, walltime, binary): """ writes a runjob based on a name, nnodes, nprocessors, walltime, and binary. Designed for runjobs on the Hennig group_list on HiperGator 2 (SLURM). Args: name (str): job name. ntasks (int): total number of requested processors. pmem (str): requested memory including units, e.g. '1600mb'. walltime (str): requested wall time, hh:mm:ss e.g. '2:00:00'. binary (str): absolute path to binary to run. """ nnodes = int(np.ceil(float(ntasks) / 32.0)) runjob = open('runjob', 'w') runjob.write('#!/bin/bash\n') runjob.write('#SBATCH --job-name={}\n'.format(name)) runjob.write('#SBATCH -o out_%j.log\n') runjob.write('#SBATCH -e err_%j.log\n') runjob.write('#SBATCH --qos=hennig-b\n') runjob.write('#SBATCH --nodes={}\n'.format(nnodes)) runjob.write('#SBATCH --ntasks={}\n'.format(ntasks)) runjob.write('#SBATCH --mem-per-cpu={}\n'.format(pmem)) runjob.write('#SBATCH -t {}\n\n'.format(walltime)) runjob.write('cd $SLURM_SUBMIT_DIR\n\n') runjob.write('module load intel/2016.0.109\n') runjob.write('module load openmpi/1.10.1\n') runjob.write('module load vasp/5.4.1\n\n') runjob.write('mpirun {} > job.log\n\n'.format(binary)) runjob.write('echo \'Done.\'\n') runjob.close()