在文本文件中拟合可变数量的洛伦兹峰值数据

0 投票
1 回答
63 浏览
提问于 2025-04-14 17:36

我已经让代码能正常工作了,但速度非常慢,而且在半最大值时没有给出正确的全宽度。有没有什么办法可以让我得到FWHM的值,同时加快处理速度,而不影响后续处理的质量呢?

    '''python'''
    # -*- coding: utf-8 -*-
    """
    Created on Fri Feb 16 11:38:14 2024
    
    @author: tppoirier96
    """
    
    # importing packages 
    import pandas as pd 
    import glob
    import numpy as np
    from scipy.optimize import leastsq
    import matplotlib.pyplot as plt
    
    folder_path = 'folder_path'
    # Locating folder with spectral data
    file_list = glob.glob(folder_path + "/*.txt")
    # Taking all text files from folder
    main_dataframe = pd.DataFrame(pd.read_table(file_list[0]))
    # Compling dataframe for each spectra
    
    #SpectraType = str(input('What kind of spectral data is this?'))
    
    for i in range(0,len(file_list)-1):
        # for all files in the glob
        data = pd.read_table(file_list[i], sep="\t")
        # Read each text file as dataframe
        df = pd.DataFrame(data)
        # Convert each file to a dataframe
        main_dataframe = pd.concat([main_dataframe, df], axis = 1)
        # Append each dataframe to larger dataframe
    
    DataDF = 
   pd.concat([main_dataframe.iloc[67:,0],main_dataframe.iloc[67:,1::2]], axis=1)
    # Cutting off first 110 wavenumbers due to substantial noise
    # Also removing the wavenumbers for all but the first spectra
    # Compling each modified spectra
    SumDataDF = DataDF.describe()
    # Summary for tracking important numbers
    DataNorm = pd.DataFrame()
    # Initializing new dataframe
    DataLorentz = pd.DataFrame()
    # Initializing new dataframe
    
    
    DataNorm = (DataDF-DataDF.min())/(DataDF.max()-DataDF.min())
    # Normalizing data
    DataNorm = pd.concat([DataDF.iloc[:,0], DataNorm], axis=1)
    # Adding wavenumbers back for visualization
    
    xData = DataNorm.iloc[:,0].array
    # Initializing wavenumber array
    E2GArray = np.zeros((4,len(file_list)))
    # Initializing E2G location array
    
    # Standard Lorentzian function
    def norm_lorentzian(x, x0, a, gamma):
        return a * gamma**2 / ( gamma**2 + ( x - x0 )**2)
    
    # For multiple peaks
    def multi_lorentz(x,params):
        off = params[0]
        paramsRest = params[1:]
        assert not (len(paramsRest )%3)
        return off + sum([norm_lorentzian( x, *paramsRest[ i : i+3 ] ) 
    for i in range( 0, len( paramsRest ), 3 ) ] )
    
    # Least squares regression method
    def res_multi_lorentz( params, xData, yData ):
        diff = [ multi_lorentz( x, params ) - y for x, y in zip( xData, 
    yData ) ]
        return diff
    
    # Begin processing
    for i in range(1,len(DataNorm.columns)-1):
        yData = DataNorm.iloc[:,i].array
        # Converting Spectra intensities to convenient type
        #NumPeaks = int(input('How many peaks to fit?\n'))
        # Collecting number of peaks to fit
        counter = 0
        # Break condition
        generalWidth = 10
        # Starting HWHM
        yDataLoc = yData
        # Duplicating intensities for convenience
        startValues = [max(DataNorm.iloc[:,i])]
        # Taking the maximum (should be 1.0)
        while max( yDataLoc ) - min( yDataLoc ) > .1:
        # I'm not sure about this condition
            counter += 1
            # Break conditon increment
            print(str(counter)+ " while")
            if counter > 20:
                print("Reached max iterations!")
                break
            minP = np.argmin( yDataLoc )
            # Index of minimum intensity (what wavenumber the baseline 
    is)
            minY = yData[ minP ]
            # What the minimum intensity is
            x0 = xData[ minP ]
            # Seeting the initial E2G position as the minimum
            startValues += [ x0, minY - max( yDataLoc ), generalWidth ]
            # Appending values to feed to lorentz function
            popt, ier = leastsq( res_multi_lorentz, startValues, args=( 
    xData, yData))
            # Returns result of optimization and integer flag for 
    solution
            # popt is a concatonation of final values similar to 
    startingValues
            # The final popts values are fed to the line below for each 
    spectral intensity set
            yDataLoc = [y - multi_lorentz( x, popt ) for x,y in zip( 
    xData, yData)]
            # Appends difference betweeen y-values and lorentzian curves
            if 1300<x0<1400:
            # If the E2G is found, append it to arrary for data 
    compilation
                E2GArray[0,i-1] = i
                # Tracking which data file it came from
                E2GArray[2,i] = x0
                # Storing location of E2G
                E2GArray[3,i] = generalWidth*2
                # Storing FWHM for later
        print(str(i)+" for")
        testData = [multi_lorentz(x,popt) for x in xData]
        # Makes array of lorentzian fits
        DataLorentz = pd.concat([DataLorentz,pd.Series(testData)] , 
    axis=1)
    
        fig = plt.figure()
        ax = fig.add_subplot( 1, 1, 1 )
        ax.plot( xData, yData, label="Data" )
        ax.plot( xData, testData, label="Lorentz")
        plt.xlabel("Wavenumber")
        plt.ylabel("Intensity relative to E2G")
        plt.legend(loc="upper left")
        plt.show()
    
    # creating a new csv file with the dataframe we created
    # cwd = os.getcwd()
    # print(cwd)
    #main_dataframe.to_excel('Mapped Data.xlsx')

我希望能快速处理大约4000个文件。目前预计处理这些数据需要一周的时间。我想要记录下主要峰值的位置(预计在最内层的if语句中介于1300和1400之间),还有FWHM的值,所以需要在E2GArray的第四行中存储这些信息。

1 个回答

0

方法论

在处理这个问题时,有几个挑战需要考虑:

  • 基线管理(去掉背景,让基线变平);
  • 峰值识别(找出峰值的位置);
  • 峰值回归(用可变峰值模型和合理的猜测来进行回归)。

然后,对每个文件进行自动化处理。

最小可复现示例

下面的类实现了上述方法:

import inspect
import itertools
import pathlib

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pybaselines import Baseline
from scipy import optimize, stats, signal
    
class SpectroscopySolver:

    @staticmethod
    def peak_model(x, loc, scale, height):
        """Lorentzian peak"""
        law = stats.cauchy(loc=loc, scale=scale)
        return height * law.pdf(x) / law.pdf(loc)

    @classmethod
    def m(cls):
        """Number of model parameters"""
        return len(inspect.signature(cls.peak_model).parameters) - 1

    @classmethod
    def model(cls, x, *p):
        """Variable peak model"""
        m = cls.m()
        n = len(p) // m
        y = np.zeros_like(x)
        for i in range(n):
            y += cls.peak_model(x, *p[i * m:(i + 1) * m])
        return y

    @classmethod
    def loss_factory(cls, xdata, ydata):
        """Least Square Factory"""
        def wrapped(p):
            """Least Square Loss function"""
            return 0.5 * np.sum(np.power(ydata - cls.model(xdata, *p), 2))
        return wrapped

    @classmethod
    def fit(cls, xdata, ydata, prominence=400.):
        """Fitting methodology"""

        # Baseline management:
        baseline_fitter = Baseline(x_data=xdata)
        baseline, params = baseline_fitter.asls(ydata)
        yb = ydata - baseline

        # Peak identifications:
        peaks, bases = signal.find_peaks(yb, prominence=prominence)
        lefts, rights = bases["left_bases"], bases["right_bases"]
        
        # Initial guess tuning:
        x0 = list(itertools.chain(*[[xdata[i], 0.1, p] for i, p in zip(peaks, bases["prominences"])]))
        bounds = list(itertools.chain(*[
            [(xdata[lefts[i]], xdata[rights[i]])] + [(0., np.inf)] * (cls.m() - 1) for i in range(len(peaks))
        ]))

        # Least Square Loss Minimization:
        loss = cls.loss_factory(xdata, yb)
        solution = optimize.minimize(loss, x0=x0, bounds=bounds)

        # Estimate:
        yhat = cls.model(xdata, *solution.x)
        ybhat = yhat + baseline
        
        # Plot:
        fig, axe = plt.subplots()
        axe.scatter(xdata, ydata, marker=".", color="gray", label="Data")
        axe.scatter(xdata[peaks], ydata[peaks], label="Peaks")
        axe.plot(xdata, baseline, "--", label="Baseline")
        axe.plot(xdata, ybhat, label="Fit")
        axe.set_title("Spectrogram")
        axe.set_xlabel(r"Wavenumber, $\nu$")
        axe.set_ylabel(r"Raw Signal, $g(\nu)$")
        axe.legend()
        axe.grid()
        
        return axe

接下来,只需对每个文件进行自动化处理:

files = list(pathlib.Path("raman/").glob("*.txt"))
solver = SpectroscopySolver()

for file in files:
    
    data = pd.read_csv(file, sep="\t", header=None, skiprows=100, names=["x", "y"])
    solver.fit(data.x, data.y)

在你的数据集上表现得相对不错。

这里输入图片描述 这里输入图片描述 这里输入图片描述

调优

你可以通过以下方式来控制这个方法:

  • 选择合适的基线拟合器(可以参考 pybaseline);
  • 调整合理的初始猜测,以更好地引导梯度下降;
  • 细化边界,以确保更好的收敛;
  • 通过显著性过滤峰值;
  • 如果你要拟合向下的峰值,可以在峰值模型前加一个负号;

撰写回答