Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a fast cross-correlation method to Series and DataFrame objects #1920

Closed
cpcloud opened this issue Sep 16, 2012 · 1 comment
Closed

Add a fast cross-correlation method to Series and DataFrame objects #1920

cpcloud opened this issue Sep 16, 2012 · 1 comment

Comments

@cpcloud
Copy link
Member

cpcloud commented Sep 16, 2012

For some reason there doesn't seem to be a built in cross-correlation method in NumPy that is fast for large input arrays. Hence this code (it computes the CCF using FFTs, I know there's one in statsmodels, but mine has more options :P, and this code was written somewhat as an exercise in understanding the cross correlation).

I adapted it to Pandas because Pandas totally rocks for organizing and munging, so I thought I would share this code. The functions not listed here are available upon request if there's interest in adding this to Pandas.

"""
Module for cross-correlation.
"""

import numpy as np
import pandas as pd

from span.utils import nextpow2, pad_larger, get_fft_funcs, detrend_mean


def autocorr(x, n):
    """Compute the autocorrelation of `x`

    Parameters
    ----------
    x : array_like
        Input array
    n : int
        Number of fft points

    Returns
    -------
    r : array_like
        The autocorrelation of `x`.
    """
    ifft, fft = get_fft_funcs(x)
    return ifft(np.absolute(fft(x, n)) ** 2.0, n)


def crosscorr(x, y, n):
    """Compute the cross correlation of `x` and `y`

    Parameters
    ----------
    x, y : array_like
    n : int

    Returns
    -------
    c : array_like
        Cross correlation of `x` and `y`
    """
    ifft, fft = get_fft_funcs(x, y)
    return ifft(fft(x, n) * fft(y, n).conj(), n)


def matrixcorr(x, nfft=None):
    """Cross-correlation of the columns in a matrix

    Parameters
    ----------
    x : array_like
        The matrix from which to compute the cross correlations of each column
        with the others

    Returns
    -------
    c : array_like
    """
    m, n = x.shape
    if nfft is None:
        nfft = int(2 ** nextpow2(2 * m - 1))
    ifft, fft = get_fft_funcs(x)
    X = fft(x.T, nfft)
    Xc = X.conj()
    mx, nx = X.shape
    c = np.empty((mx ** 2, nx), dtype=X.dtype)
    for i in xrange(n):
        c[i * n:(i + 1) * n] = X[i] * Xc
    return ifft(c, nfft).T, m


def xcorr(x, y=None, maxlags=None, detrend=detrend_mean, scale_type='normalize'):
    """Compute the cross correlation of `x` and `y`.

    This function computes the cross correlation of `x` and `y`. It uses the
    equivalence of the cross correlation with the negative convolution computed
    using a FFT to achieve much faster cross correlation than is possible with
    the traditional signal processing definition.

    By default it computes the cross correlation at each of 1 - maxlags to maxlags,
    scaled by the lag 0 cross correlation after mean centering the data.

    Note that it is not necessary for `x` and `y` to be the same size.

    Parameters
    ----------
    x : array_like
    y : array_like, optional
        If not given or is equal to `x`, the autocorrelation is computed.
    maxlags : int, optional
        The highest lag at which to compute the cross correlation.
    detrend : callable, optional
        A callable to detrend the data.
    scale_type : str, optional

    Returns
    -------
    c : pandas.Series or pandas.DataFrame
    """
    def unbiased(c, lsize):
        """
        """
        return c / (lsize - np.abs(c.index))


    def biased(c, lsize):
        """
        """
        return c / lsize

    def normalize(c, lsize):
        """
        """
        assert c.ndim in (1, 2), 'invalid size of cross correlation array'

        if c.ndim == 1:
            cdiv = c.ix[0]
        else:
            mc, nc = c.shape
            ncsqrt = int(np.sqrt(nc))
            jkl = np.diag(np.r_[:nc].reshape((ncsqrt, ncsqrt)))
            tmp = np.sqrt(c.ix[0, jkl])
            cdiv = np.outer(tmp, tmp).ravel()
        return c / cdiv

    SCALE_FUNCTIONS = {
        None: lambda c, lsize: c,
        'none': lambda c, lsize: c,
        'unbiased': unbiased,
        'biased': biased,
        'normalize': normalize
    }

    assert x.ndim in (1, 2), 'x must be a vector or matrix'

    x = detrend(x)

    if x.ndim == 2 and np.all(np.greater(x.shape, 1)):
        assert y is None, 'y argument not allowed when x is a 2D array'
        ctmp, lsize = matrixcorr(x)
    elif y is None or y is x or np.array_equal(x, y):
        lsize = x.size
        ctmp = autocorr(x, int(2 ** nextpow2(2 * lsize - 1)))
    else:
        x, y, lsize = pad_larger(x, detrend(y))
        ctmp = crosscorr(x, y, int(2 ** nextpow2(2 * lsize - 1)))

    if maxlags is None:
        maxlags = lsize
    else:
        assert maxlags <= lsize, 'max lags must be less than or equal to %i' % lsize

    lags = np.r_[1 - maxlags:maxlags]
    return_type = pd.DataFrame if ctmp.ndim == 2 else pd.Series
    scaler = SCALE_FUNCTIONS[scale_type]
    return scaler(return_type(ctmp[lags], index=lags), lsize)

Right now I'm using it like

class Series(pd.Series):
    def __new__(cls, *args, **kwargs):
        return pd.Series.__new__(cls, *args, **kwargs).view(Series)

    def __init__(self, *args, **kwargs):
        super(Series, self).__init__(*args, **kwargs)

    def xcorr(self, other=None, maxlags=None, detrend=span.utils.detrend_mean,
              scale_type='normalize'):
        return xcorr(self, other, maxlags=maxlags, detrend=detrend,
                              scale_type=scale_type)

    def asdfasdf(self):
        return 1


class DataFrame(pd.DataFrame):
    def __init__(self, *args, **kwargs):
        super(DataFrame, self).__init__(*args, **kwargs)

    def xcorr(self, other=None, maxlags=None, detrend=span.utils.detrend_mean,
              scale_type='normalize'):
        return xcorr(self, maxlags=maxlags, detrend=detrend,
                              scale_type=scale_type)
@cpcloud
Copy link
Member Author

cpcloud commented Jan 8, 2013

Index and friends messes with cross-correlation, maybe try for this at a later date.

@cpcloud cpcloud closed this as completed Jan 8, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant