Source code for mpinterfaces.database

# coding: utf-8
# Copyright (c) Henniggroup.
# Distributed under the terms of the MIT License.

from __future__ import division, print_function, unicode_literals, \
    absolute_import

"""
Put data into mongo database
"""

from six.moves import range

import sys
import os
import json
import logging
import socket
import string
import datetime

import numpy as np

from pymatgen.core.structure import Structure
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer
from pymatgen.analysis.bond_valence import BVAnalyzer

from matgendb.creator import VaspToDbTaskDrone
from matgendb.creator import logger as mgdb_logger

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
sh = logging.StreamHandler(stream=sys.stdout)
sh.setFormatter(formatter)
logger.addHandler(sh)
mgdb_logger.addHandler(sh)


[docs]class MPINTVaspToDbTaskDrone(VaspToDbTaskDrone): """ subclassing VaspToDbTaskDrone """ def __init__(self, host="127.0.0.1", port=27017, database="vasp", user=None, password=None, collection="nanoparticles", parse_dos=False, compress_dos=False, simulate_mode=False, additional_fields=None, update_duplicates=True, mapi_key=None, use_full_uri=True, runs=None): VaspToDbTaskDrone.__init__(self, host=host, port=port, database=database, user=user, password=password, collection=collection, parse_dos=parse_dos, compress_dos=compress_dos, simulate_mode=simulate_mode, additional_fields=additional_fields, update_duplicates=update_duplicates, mapi_key=mapi_key, use_full_uri=use_full_uri, runs=runs)
[docs] def generate_doc(self, dir_name, vasprun_files): """ Overridden """ try: fullpath = os.path.abspath(dir_name) d = {k: v for k, v in self.additional_fields.items()} d["dir_name"] = fullpath d["schema_version"] = VaspToDbTaskDrone.__version__ d["calculations"] = [ self.process_vasprun(dir_name, taskname, filename) for taskname, filename in vasprun_files.items()] d1 = d["calculations"][0] d2 = d["calculations"][-1] # Now map some useful info to the root level. for root_key in ["completed_at", "nsites", "unit_cell_formula", "reduced_cell_formula", "pretty_formula", "elements", "nelements", "cif", "density", "is_hubbard", "hubbards", "run_type"]: d[root_key] = d2[root_key] d["chemsys"] = "-".join(sorted(d2["elements"])) # store any overrides to the exchange correlation functional xc = d2["input"]["incar"].get("GGA") if xc: xc = xc.upper() d["input"] = {"crystal": d1["input"]["crystal"], "is_lasph": d2["input"]["incar"].get("LASPH", False), "potcar_spec": d1["input"].get("potcar_spec"), "xc_override": xc} vals = sorted(d2["reduced_cell_formula"].values()) d["anonymous_formula"] = {string.ascii_uppercase[i]: float(vals[i]) for i in range(len(vals))} d["output"] = { "crystal": d2["output"]["crystal"], "final_energy": d2["output"]["final_energy"], "final_energy_per_atom": d2["output"]["final_energy_per_atom"]} d["name"] = "vasp" p = d2["input"]["potcar_type"][0].split("_") pot_type = p[0] functional = "lda" if len(pot_type) == 1 else "_".join(p[1:]) d["pseudo_potential"] = {"functional": functional.lower(), "pot_type": pot_type.lower(), "labels": d2["input"]["potcar"]} if len(d["calculations"]) == len(self.runs) or \ list(vasprun_files.keys())[0] != "relax1": d["state"] = "successful" if d2["has_vasp_completed"] \ else "unsuccessful" else: d["state"] = "stopped" d["analysis"] = analysis_and_error_checks(d) sg = SpacegroupAnalyzer( Structure.from_dict(d["output"]["crystal"]), 0.1) d["spacegroup"] = {"symbol": sg.get_spacegroup_symbol(), "number": sg.get_spacegroup_number(), "point_group": sg.get_point_group(), "source": "spglib", "crystal_system": sg.get_crystal_system(), "hall": sg.get_hall()} d["last_updated"] = datetime.datetime.today() return d except Exception as ex: import traceback print(traceback.format_exc()) logger.error("Error in " + os.path.abspath(dir_name) + ".\n" + traceback.format_exc()) return None
[docs] def post_process(self, dir_name, d): """ customization: adds system.json to the dictionary """ logger.info("Post-processing dir:{}".format(dir_name)) fullpath = os.path.abspath(dir_name) filename = os.path.join(fullpath, "system.json") if os.path.exists(filename): with open(filename, "r") as f: system = json.load(f) d["hkl"] = system.get("hkl") d["ligand"] = system.get("ligand") # from pyamtgen-db # Parse OUTCAR for additional information and run # stats that are generally not in vasprun.xml. try: run_stats = {} overall_run_stats = {} for key in ["Total CPU time used (sec)", "User time (sec)", "System time (sec)", "Elapsed time (sec)"]: overall_run_stats[key] = sum([v[key] for v in run_stats.values()]) run_stats["overall"] = overall_run_stats except: logger.error("Bad run stats for {}.".format(fullpath)) d["run_stats"] = run_stats # Convert to full uri path. if self.use_full_uri: d["dir_name"] = get_uri(dir_name) logger.info("Post-processed " + fullpath)
[docs]def get_uri(dir_name): """ Customized version of the original pymatgen-db version. Customization required because same job folder on hipergator gets different uri for different login nodes . Returns the URI path for a directory. This allows files hosted on different file servers to have distinct locations. Args: dir_name: A directory name. Returns: Full URI path, e.g., fileserver.host.com:/full/path/of/dir_name. """ fullpath = os.path.abspath(dir_name) try: hostname = socket.gethostbyaddr(socket.gethostname())[0].split('.')[1] except: hostname = socket.gethostname() return "{}:{}".format(hostname, fullpath)
# remove coordination number
[docs]def analysis_and_error_checks(d, max_force_threshold=0.5, volume_change_threshold=0.2): initial_vol = d["input"]["crystal"]["lattice"]["volume"] final_vol = d["output"]["crystal"]["lattice"]["volume"] delta_vol = final_vol - initial_vol percent_delta_vol = delta_vol / initial_vol # coord_num = get_coordination_numbers(d) calc = d["calculations"][-1] gap = calc["output"]["bandgap"] cbm = calc["output"]["cbm"] vbm = calc["output"]["vbm"] is_direct = calc["output"]["is_gap_direct"] warning_msgs = [] error_msgs = [] if abs(percent_delta_vol) > volume_change_threshold: warning_msgs.append("Volume change > {}%" .format(volume_change_threshold * 100)) bv_struct = Structure.from_dict(d["output"]["crystal"]) try: bva = BVAnalyzer() bv_struct = bva.get_oxi_state_decorated_structure(bv_struct) except ValueError as e: logger.error("Valence cannot be determined due to {e}." .format(e=e)) except Exception as ex: logger.error("BVAnalyzer error {e}.".format(e=str(ex))) max_force = None if d["state"] == "successful" and \ d["calculations"][0]["input"]["parameters"].get("NSW", 0) > 0: # handle the max force and max force error max_force = max([np.linalg.norm(a) for a in d["calculations"][-1]["output"] ["ionic_steps"][-1]["forces"]]) if max_force > max_force_threshold: error_msgs.append("Final max force exceeds {} eV" .format(max_force_threshold)) d["state"] = "error" s = Structure.from_dict(d["output"]["crystal"]) if not s.is_valid(): error_msgs.append("Bad structure (atoms are too close!)") d["state"] = "error" return {"delta_volume": delta_vol, "max_force": max_force, "percent_delta_volume": percent_delta_vol, "warnings": warning_msgs, "errors": error_msgs, "bandgap": gap, "cbm": cbm, "vbm": vbm, "is_gap_direct": is_direct, "bv_structure": bv_struct.as_dict()}