Helper functions for spectral DMD.

    # MIT License

    # Copyright (c) 2022 Andy Goldschmidt

    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the "Software"), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:

    # The above copyright notice and this permission notice shall be included in all
    # copies or substantial portions of the Software.

    # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    # SOFTWARE.

    import numpy as np

    def BigOmg(omega, ts):
        '''
        Construct time-domain coordinates for Spectral DMD.
        
        Args:
            omega (`1d-array`): 1d array of frequencies 
            ts (`1d-arary`): 1d array of times
            
        Returns:
            `nd-array`: 2*len(omega) by len(t) array of $[\cos(\vec{omega} t), \sin(\vec{omega} t)]^T$
        '''
        omt = omega.reshape(-1,1)@ts.reshape(1,-1)
        return np.vstack([np.cos(2*np.pi*omt), np.sin(2*np.pi*omt)])


    def loss(X, A, omega, ts):
        '''
        Loss function for Spectral DMD.
        
        Returns:
            float: Evaluation of loss function.
        '''
        return np.linalg.norm(X - A@BigOmg(omega,ts))


    def grad_loss(X, A, omega, ts):
        '''
        Gradient of the loss function w.r.t. $\omega$.
        '''
        n_omg = len(omega)
        part2 = -4*np.pi*A.T@(X - A@BigOmg(omega,ts))
        grad_res = [0]*n_omg
        for i in range(n_omg):
            grad_res[i] += (-np.sin(2*np.pi*omega[i]*ts)*ts).dot(part2[i,:])
            grad_res[i] += (np.cos(2*np.pi*omega[i]*ts)*ts).dot(part2[n_omg + i,:])
        return np.array(grad_res).reshape(1,-1)


    def grad_loss_j(j, X, A, omega, ts):
        '''
        Gradient of the loss function w.r.t. $\omega_j$
        
        Returns:
            float: Gradient of the loss w.r.t $\omega_j$
        '''
        n_omg = len(omega)
        if j > n_omg - 1:
            raise ValueError("Invalid value. Index j={} exceeds len(omega)={}.".format(j,n_omg))
            
        part2 = -4*np.pi*A.T@(X - A@BigOmg(omega,ts))
        grad_res_j = (-np.sin(2*np.pi*omega[j]*ts)*ts).dot(part2[j,:])
        grad_res_j += (np.cos(2*np.pi*omega[j]*ts)*ts).dot(part2[n_omg + j,:])
        return grad_res_j


    def residual_j(j, X, A, omega, ts):
        '''
        Residual for the data trajectory X using the model A, $\Omega$. The residual excludes the contribution
        of the frequency $\omega_j$.
        
        TODO:
        * Check the case where A.shape[0] = 1
        
        Returns:
            `ndarray`: residual of shape X.shape[0] by ts.shape[0]
        '''
        n_omega = len(omega)
        if j > n_omega - 1:
            raise ValueError("Invalid value. Index j={} exceeds len(omega)={}.".format(j, n_omega))
        
        j2 = j + n_omega
        indices = np.hstack([np.arange(j), np.arange(j+1,j2), np.arange(j2+1, 2*n_omega)])
        return X - A[:,indices]@(BigOmg(omega,ts)[indices, :])


    def update_A(X, omega, ts, threshold, threshold_type):
        # Update A
        # -- This step could get some DMD love
        U,S,Vt = np.linalg.svd(BigOmg(omega, ts), full_matrices=False)
        if threshold_type == 'count':
            r = threshold
        elif threshold_type == 'percent':
            r = np.sum(S/np.max(S) > threshold)
        rU = U[:,:r]
        rS = S[:r]
        rVt = Vt[:r, :]
        return X@np.conj(rVt.T)@np.diag(1/rS)@np.conj(rU.T)


    def max_fft_update(result, dt):
            # Real signal means the other half of the hat are complex conjugates
            n = result.shape[1]
            n_sym = n//2 if n % 2 == 0 else n//2 + 1
            
            # Compute fft
            res_hat = np.fft.fft(result, axis=1)
            
            # Get the maximum freq. coordinate considering all data dimensions
            ires = np.argmax(np.sum(np.abs(res_hat[:,:n_sym]), axis=0))
            res_freq = np.fft.fftfreq(n, dt)[:n_sym]
            return res_freq[ires]
            

    # Accelerated proximal gradient descent
    # -----------------------------------------------------------------------------
    def optimizeWithAPGD(x0, func_f, func_g, grad_f, prox_g, beta_f, tol=1e-6, max_iter=1000, verbose=False):
        """
        Optimize with Accelerated Proximal Gradient Descent Method
            min_x f(x) + g(x)
        where f is beta smooth and g is proxiable.
        
        input
        -----
        x0 : array_like
            Starting point for the solver
        func_f : function
            Input x and return the function value of f
        func_g : function
            Input x and return the function value of g
        grad_f : function
            Input x and return the gradient of f
        prox_g : function
            Input x and a constant float number and return the prox solution
        beta_f : float
            beta smoothness constant for f
        tol : float, optional
            Gradient tolerance for terminating the solver.
        max_iter : int, optional
            Maximum number of iteration for terminating the solver.
            
        output
        ------
        x : array_like
            Final solution
        obj_his : array_like
            Objective function value convergence history
        err_his : array_like
            Norm of gradient convergence history
        exit_flag : int
            0, norm of gradient below `tol`
            1, exceed maximum number of iteration
            2, others
        """
        # initial information
        x = x0.copy()
        y = x0.copy()
        g = grad_f(y)
        t = 1.0
        #
        step_size = 1.0/beta_f
        # not recording the initial point since we do not have measure of the optimality
        obj_his = np.zeros(max_iter)
        err_his = np.zeros(max_iter)
        
        # start iteration
        iter_count = 0
        err = tol + 1.0
        while err >= tol:
            #####
            # Accelerated proximal gradient
            x_new = prox_g(y - step_size*g, step_size)
            t_new = (iter_count - 1)/(iter_count + 2)
            y_new = x_new + t_new*(x_new - x)
            # FISTA version:
            # t_new = (1 + np.sqrt(1+4*t**2))/2
            # y_new = x_new + (t - 1)/t_new*(x_new - x)
            #####
            #
            # update information
            obj = func_f(x_new) + func_g(x_new)
            err = np.linalg.norm(x - x_new)
            #
            np.copyto(x, x_new)
            np.copyto(y, y_new)
            t = t_new
            g = grad_f(y)
            #
            obj_his[iter_count] = obj
            err_his[iter_count] = err
            #
            # check if exceed maximum number of iteration
            iter_count += 1
            if iter_count >= max_iter:
                if verbose:
                    print('Proximal gradient descent reach maximum of iteration')
                return x, obj_his[:iter_count], err_his[:iter_count], 1
        #
        return x, obj_his[:iter_count], err_his[:iter_count], 0
Andy J. Goldschmidt
Andy J. Goldschmidt
Ph.D. student in Physics