Abinit几何优化收敛监控与可视化工具

March 19, 2026
Published in 计算工具

Abstract

在计算材料科学和第一性原理研究中,几何优化的收敛过程常常耗时较长且中途可能出现不稳定的振荡。本文为大家提供了一款自动化的 Python 监控脚本,用于批量读取和可视化 Abinit 软件在优化过程中记录的轨迹数据,帮助研究人员快速掌控计算进度与系统状态。

Keywords: Python, Abinit, 数据可视化, 几何优化, 计算化学

程序背景与功能详解

在处理大批量的几何优化计算任务时,逐一打开输出日志检查收敛过程不但极其繁琐,也难以直观地感知系统的变化趋势。为此,该 Python 监控程序实现了如下核心功能操作:

  1. 自动扫描与检索:支持对目标目录下大量符合匹配模式(如 pattern)的工作文件进行自动化遍历。
  2. NetCDF 高效解析:精确解析 Abinit 计算中产生的 runo_HIST.nc 文件,无损提取体系在每个离子步的系统总能量(Etot)以及原子的最大受力(Fmax)。
  3. 计算状态诊断:扫描 .abo 日志文件的尾段关键字符,快速智能分辨计算任务处于“已完成”(DONE)、“仍在运行”(RUNNING)还是“由于报错或异常终止”(ERROR)状态。
  4. 多维度可视化作图:依靠 matplotlib 可视化功能,输出基于步骤变化的详细收敛曲线,并支持相对坐标、单位转换和整体横评。

运行逻辑与流程图

脚本的运行流程依次通过遍历寻址、读取数据、处理状态和作图这四个过程实现了上述目标。相关的处理流程图如下所示:

收敛监控流程图

脚本核心源码展示

以下是实现上述操作的监控脚本核心源代码。脚本调用了内置的 globre 提取对应规则的数据体系,并重度依赖于数据提取模块 netCDF4 处理二进制科学文件。

#!/usr/bin/env python3
"""
Monitor geometry optimization convergence for all abinit calculations.
Reads runo_HIST.nc files to extract total energy at each ionic step and plots convergence curves.

Usage:
    python monitor_convergence.py [options]

Options:
    --calc-dir  Root calculation directory (default: ../calculation)
    --pattern   Directory name glob pattern (default: al27zr1o1s-*)
    --output    Output PNG path (default: convergence.png)
    --eV        Use eV as energy unit (default: Ha)
    --relative  Show energy relative to first step (more intuitive)
    --list      Print status summary only, skip plotting
    --overview  Also generate an overview plot with all calculations overlaid
"""

import os
import re
import glob
import argparse
import numpy as np
import netCDF4 as nc
import matplotlib
matplotlib.use('Agg')  # 无显示屏时也能保存图片
import matplotlib.pyplot as plt
import matplotlib.cm as cm


HA2EV = 27.211396132  # 1 Hartree = 27.211 eV


def find_calc_dirs(calc_root: str, pattern: str) -> list[str]:
    """按数字顺序返回匹配的计算目录列表。"""
    dirs = glob.glob(os.path.join(calc_root, pattern))
    def sort_key(d):
        m = re.search(r'-(\d+)$', os.path.basename(d))
        return int(m.group(1)) if m else 0
    return sorted(dirs, key=sort_key)


def read_hist_nc(hist_path: str) -> dict | None:
    """从 HIST.nc 文件中读取关键变量。返回 None 表示文件不存在或读取失败。"""
    if not os.path.isfile(hist_path):
        return None
    try:
        ds = nc.Dataset(hist_path, 'r')
        etotal = np.array(ds.variables['etotal'][:])  # Ha
        # Try to read max force (fcart: [nstep, natom, 3])
        fcart = None
        if 'fcart' in ds.variables:
            fcart_raw = np.array(ds.variables['fcart'][:])  # Ha/Bohr
            fcart = np.sqrt((fcart_raw ** 2).sum(axis=-1)).max(axis=-1)  # max atomic force per step
        ds.close()
        return {'etotal': etotal, 'fmax': fcart, 'nstep': len(etotal)}
    except Exception as e:
        print(f"  [WARNING] Failed to read {hist_path}: {e}")
        return None


def check_convergence(abo_path: str) -> str:
    """从 .abo 文件尾部判断是否已收敛/完成。"""
    if not os.path.isfile(abo_path):
        return 'NO_ABO'
    try:
        with open(abo_path, 'r', errors='ignore') as f:
            tail = f.read()[-8000:]  # read only the tail
        if 'Voluntary context switches' in tail or 'Calculation completed' in tail:
            if 'converged' in tail.lower() or 'DATASET' in tail:
                return 'DONE'
            return 'DONE'
        if 'error' in tail.lower() or 'ERROR' in tail:
            return 'ERROR'
        return 'RUNNING'
    except Exception:
        return 'UNKNOWN'


def print_summary(results: list[dict], use_ev: bool):
    """打印所有计算的文本摘要。"""
    unit = 'eV' if use_ev else 'Ha'
    conv = HA2EV if use_ev else 1.0

    header = f"{'Calculation':<25} {'Status':<10} {'Steps':>6} {'E_first':>16} {'E_last':>16} {'dE':>14}  ({unit})"
    print(header)
    print('-' * len(header))
    for r in results:
        name = r['name']
        status = r['status']
        if r['data'] is None:
            print(f"{name:<25} {status:<10} {'N/A':>6} {'N/A':>16} {'N/A':>16} {'N/A':>14}")
            continue
        etotal = r['data']['etotal'] * conv
        nstep = len(etotal)
        e0 = etotal[0]
        e_last = etotal[-1]
        delta = e_last - e0
        print(f"{name:<25} {status:<10} {nstep:>6} {e0:>16.6f} {e_last:>16.6f} {delta:>14.6f}")


def plot_convergence(results: list[dict], output_path: str, use_ev: bool, relative: bool):
    """绘制所有计算的能量收敛曲线,每个计算一个子图。"""
    # 只绘制有数据的
    valid = [r for r in results if r['data'] is not None]
    if not valid:
        print("[ERROR] No HIST.nc data found to plot.")
        return

    n = len(valid)
    ncols = min(4, n)
    nrows = (n + ncols - 1) // ncols

    unit_label = 'eV' if use_ev else 'Ha'
    conv = HA2EV if use_ev else 1.0

    fig, axes = plt.subplots(nrows, ncols,
                             figsize=(4.5 * ncols, 3.5 * nrows),
                             squeeze=False)
    fig.suptitle('Geometry Optimization Convergence Monitor', fontsize=14, fontweight='bold', y=1.01)

    colors = cm.tab10(np.linspace(0, 1, 10))

    for idx, r in enumerate(valid):
        ax = axes[idx // ncols][idx % ncols]
        data = r['data']
        etotal = data['etotal'] * conv
        steps = np.arange(1, len(etotal) + 1)

        y = etotal - etotal[0] if relative else etotal
        ylabel = f'dE ({unit_label}, rel. step 1)' if relative else f'E_total ({unit_label})'
        color = colors[idx % 10]
        ax.plot(steps, y, 'o-', color=color, markersize=3, linewidth=1.2)

        # Plot max force on secondary y-axis if available
        if data['fmax'] is not None:
            ax2 = ax.twinx()
            ax2.plot(steps, data['fmax'], 's--', color='gray',
                     markersize=2, linewidth=0.8, alpha=0.6, label='Fmax (Ha/Bohr)')
            ax2.set_ylabel('Fmax (Ha/Bohr)', fontsize=7, color='gray')
            ax2.tick_params(axis='y', labelsize=6, colors='gray')

        # Status annotation
        status_color = {'DONE': 'green', 'RUNNING': 'orange',
                        'ERROR': 'red'}.get(r['status'], 'gray')
        ax.set_title(f"{r['name']}\n[{r['status']}]",
                     fontsize=8, color=status_color)
        ax.set_xlabel('Ionic step', fontsize=7)
        ax.set_ylabel(ylabel, fontsize=7)
        ax.tick_params(labelsize=6)
        ax.grid(True, alpha=0.3)

        # 标出最低能量点
        min_idx = np.argmin(etotal)
        ax.axvline(x=steps[min_idx], color='blue', linewidth=0.8,
                   linestyle=':', alpha=0.5)
        ax.annotate(f'min: {etotal[min_idx]:.4f}',
                    xy=(steps[min_idx], y[min_idx]),
                    xytext=(5, 5), textcoords='offset points',
                    fontsize=5, color='blue')

    # 隐藏多余子图
    for idx in range(len(valid), nrows * ncols):
        axes[idx // ncols][idx % ncols].set_visible(False)

    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    print(f"\n[Done] Convergence plot saved to: {output_path}")


def plot_overview(results: list[dict], output_path: str, use_ev: bool):
    """将所有计算画在同一张图上(overview)。"""
    valid = [r for r in results if r['data'] is not None]
    if not valid:
        print("[INFO] No valid data for overview.")
        return

    conv = HA2EV if use_ev else 1.0
    unit_label = 'eV' if use_ev else 'Ha'

    fig, ax = plt.subplots(figsize=(12, 6))
    colors = cm.tab20(np.linspace(0, 1, max(len(valid), 1)))

    for idx, r in enumerate(valid):
        etotal = r['data']['etotal'] * conv
        steps = np.arange(1, len(etotal) + 1)
        delta = etotal - etotal[0]  # energy relative to first step
        ax.plot(steps, delta, '-', color=colors[idx], linewidth=1,
                label=r['name'], alpha=0.8)

    ax.set_xlabel('Ionic step', fontsize=11)
    ax.set_ylabel(f'dE ({unit_label}, rel. to step 1)', fontsize=11)
    ax.set_title('Energy Convergence Overview (all calculations)', fontsize=13, fontweight='bold')
    ax.axhline(0, color='black', linewidth=0.5, linestyle='--')
    ax.grid(True, alpha=0.3)

    if len(valid) <= 20:
        ax.legend(fontsize=6, ncol=2, loc='upper right')

    overview_path = output_path.replace('.png', '_overview.png')
    plt.tight_layout()
    plt.savefig(overview_path, dpi=150, bbox_inches='tight')
    print(f"[Done] Overview plot saved to: {overview_path}")


def main():
    parser = argparse.ArgumentParser(description='Monitor abinit geometry optimization convergence')
    parser.add_argument('--calc-dir', default='../calculation',
                        help='Root calculation directory (default: ../calculation)')
    parser.add_argument('--pattern', default='al27zr1o1s-*',
                        help='Directory name glob pattern (default: al27zr1o1s-*)')
    parser.add_argument('--output', default='convergence.png',
                        help='Output PNG path (default: convergence.png)')
    parser.add_argument('--eV', action='store_true',
                        help='Use eV as energy unit (default: Ha)')
    parser.add_argument('--relative', action='store_true',
                        help='Show energy relative to the first step')
    parser.add_argument('--list', action='store_true',
                        help='Print summary only, skip plotting')
    parser.add_argument('--overview', action='store_true',
                        help='Also generate an overview plot with all calculations overlaid')
    args = parser.parse_args()

    # Support both absolute and relative paths for --calc-dir
    script_dir = os.path.dirname(os.path.abspath(__file__))
    if os.path.isabs(args.calc_dir):
        calc_root = args.calc_dir
    else:
        calc_root = os.path.normpath(os.path.join(script_dir, args.calc_dir))

    print(f"Scanning: {calc_root}")
    print(f"Pattern:  {args.pattern}\n")

    calc_dirs = find_calc_dirs(calc_root, args.pattern)
    if not calc_dirs:
        print(f"[ERROR] No directories matching '{args.pattern}' found in {calc_root}.")
        return

    results = []
    for d in calc_dirs:
        name = os.path.basename(d)
        hist_path = os.path.join(d, 'runo_HIST.nc')
        abo_path = os.path.join(d, 'run.abo')

        data = read_hist_nc(hist_path)
        status = check_convergence(abo_path)

        results.append({'name': name, 'data': data, 'status': status})
        mark = '✓' if data else '✗'
        print(f"  [{mark}] {name:<25}  steps={str(data['nstep']) if data else 'N/A':<4}  status={status}")

    print(f"\nFound {len(results)} calculation dirs, "
          f"{sum(1 for r in results if r['data'] is not None)} with HIST.nc data.\n")

    print_summary(results, args.eV)

    if not args.list:
        output_path = os.path.join(script_dir, args.output)
        plot_convergence(results, output_path, args.eV, args.relative)
        if args.overview:
            plot_overview(results, output_path, args.eV)


if __name__ == '__main__':
    main()

使用配置指南

程序的最后模块设计了一份功能完整的命令行分析选项(argparse 库),使用人员完全可以依据实际的任务需要做出调整:

  1. 执行快速绘图:在基础计算场景下,可以直接无选项运行脚本,即以默认参数读取 ../calculation 下匹配体系前缀的任务:
    python monitor_convergence.py
  2. 多参数高级配置:用户如果想要获取各计算轨迹相对起点的精确下降数值,并将所有数据线交汇叠加至一张全景图中作比对,同时输出单位设为 eV,可以执行以下命令操作:
    python monitor_convergence.py --eV --relative --overview
  3. 文本汇总模式:如果服务器没有搭载图形组件或不想产生绘图卡顿,附加 --list 可以实现单纯的数据摘要报表打印:
    python monitor_convergence.py --list

得益于这一全面监控手段的设计,复杂庞大体系中多发任务的状态检查也将变得更加清晰可见。