Source code for mgcpy.independence_tests.abstract_class

"""
    **Main Independence Test Abstract Class**
"""
import time
from abc import ABC, abstractmethod

import numpy as np
from scipy.spatial.distance import pdist, squareform
from scipy.stats import kendalltau, pearsonr, spearmanr, t
from mgcpy.independence_tests.utils.compute_distance_matrix import \
    compute_distance


def EUCLIDEAN_DISTANCE(x):
    return squareform(pdist(x, metric="euclidean"))


class IndependenceTest(ABC):
    """
    IndependenceTest abstract class

    Specifies the generic interface that must be implemented by
    all the independence tests in the mgcpy package.

    :param compute_distance_matrix: a function to compute the pairwise distance matrix, given a data matrix
    :type compute_distance_matrix: ``FunctionType`` or ``callable()``
    """

    def __init__(self, compute_distance_matrix=None):
        self.test_statistic_ = None
        self.test_statistic_metadata_ = None
        self.p_value_ = None
        self.p_value_metadata_ = None
        self.which_test = None

        if not compute_distance_matrix:
            compute_distance_matrix = EUCLIDEAN_DISTANCE
        self.compute_distance_matrix = compute_distance_matrix

        super().__init__()

    def get_name(self):
        """
        :return: the name of the independence test
        :rtype: string
        """
        return self.which_test

    @abstractmethod
    def test_statistic(self, matrix_X, matrix_Y):
        """
        Abstract method to compute the test statistic given two data matrices

        :param matrix_X: a ``[n*p]`` data matrix, a matrix with n samples in ``p`` dimensions
        :type matrix_X: 2D `numpy.array`

        :param matrix_Y: a ``[n*q]`` data matrix, a matrix with n samples in ``q`` dimensions
        :type matrix_Y: 2D `numpy.array`

        :return: returns a list of two items, that contains:

            - :test_statistic_: the test statistic computed using the respective independence test
            - :test_statistic_metadata_: (optional) metadata other than the test_statistic,
                                        that the independence tests computes in the process
        :rtype: list
        """

        pass

    def p_value(self, matrix_X, matrix_Y, replication_factor=1000):
        """
        Tests independence between two datasets using the independence test and permutation test.

        :param matrix_X: a ``[n*p]`` matrix, a matrix with n samples in ``p`` dimensions
        :type matrix_X: 2D `numpy.array`

        :param matrix_Y: a ``[n*q]`` matrix, a matrix with n samples in ``q`` dimensions
        :type matrix_Y: 2D `numpy.array`

        :param replication_factor: specifies the number of replications to use for
                                   the permutation test. Defaults to ``1000``.
        :type replication_factor: integer

        :return: returns a list of two items, that contains:

            - :p_value_: P-value
            - :p_value_metadata_: (optional) a ``dict`` of metadata other than the p_value,
                                 that the independence tests computes in the process
        """
        # np.random.seed(int(time.time()))

        # calculte the test statistic with the given data
        test_statistic, independence_test_metadata = self.test_statistic(matrix_X, matrix_Y)

        if self.get_name() == "mgc":
            local_correlation_matrix = independence_test_metadata["local_correlation_matrix"]

            p_local_correlation_matrix = np.zeros(local_correlation_matrix.shape)
            p_value = 1/replication_factor

            null_distribution = []
            # compute sample MGC statistic and all local correlations for each set of permuted data
            for _ in range(replication_factor):

                # use random permutations on the second data set
                premuted_matrix_Y = np.random.permutation(matrix_Y)

                temp_mgc_statistic, temp_independence_test_metadata = self.test_statistic(
                        matrix_X, premuted_matrix_Y)
                null_distribution.append(temp_mgc_statistic)
                temp_local_correlation_matrix = temp_independence_test_metadata["local_correlation_matrix"]

                p_value += ((temp_mgc_statistic >= test_statistic) * (1/replication_factor))
                p_local_correlation_matrix += ((temp_local_correlation_matrix >=
                                                local_correlation_matrix) * (1/replication_factor))

            p_value_metadata = {"test_statistic": test_statistic,
                                "null_distribution": null_distribution,
                                "p_local_correlation_matrix": p_local_correlation_matrix,
                                "local_correlation_matrix": local_correlation_matrix,
                                "optimal_scale": independence_test_metadata["optimal_scale"]}
        elif self.get_name() == "kendall":
            test_statistic, p_value = kendalltau(matrix_X, matrix_Y)
            p_value_metadata = {"test_statistic": test_statistic}
        elif self.get_name() == "spearman":
            test_statistic, p_value = spearmanr(matrix_X, matrix_Y)
            p_value_metadata = {"test_statistic": test_statistic}
        elif self.get_name() == "pearson":
            test_statistic, p_value = pearsonr(matrix_X.reshape(-1), matrix_Y.reshape(-1))
            p_value_metadata = {"test_statistic": test_statistic}
        else:
            # estimate the null by a permutation test
            test_stats_null = np.zeros(replication_factor)
            for rep in range(replication_factor):
                permuted_x = np.random.permutation(matrix_X)
                permuted_y = np.random.permutation(matrix_Y)
                test_stats_null[rep], _ = self.test_statistic(matrix_X=permuted_x, matrix_Y=permuted_y)
            test_stats_null[0] = test_statistic
            # p-value is the probability of observing more extreme test statistic under the null
            p_value = np.where(test_stats_null >= test_statistic)[0].shape[0] / replication_factor
            p_value_metadata = {"test_statistic": test_statistic,
                                "null_distribution": test_stats_null}

        # Correct for a p_value of 0. This is because, with bootstrapping permutations, a value of 0 is not valid
        if p_value == 0:
            p_value = 1 / replication_factor
        self.p_value_ = p_value
        self.p_value_metadata_ = p_value_metadata
        return p_value, p_value_metadata

    def p_value_block(self, matrix_X, matrix_Y, replication_factor=1000):
        """
        Tests independence between two datasets using block permutation test.

        :param matrix_X: is interpreted as either:

            - a ``[n*n]`` distance matrix, a square matrix with zeros on diagonal for ``n`` samples OR
            - a ``[n*p]`` data matrix, a matrix with ``n`` samples in ``p`` dimensions
        :type matrix_X: 2D numpy.array

        :param matrix_Y: is interpreted as either:

            - a ``[n*n]`` distance matrix, a square matrix with zeros on diagonal for ``n`` samples OR
            - a ``[n*q]`` data matrix, a matrix with ``n`` samples in ``q`` dimensions
        :type matrix_Y: 2D numpy.array

        :param replication_factor: specifies the number of replications to use for
                                   the permutation test. Defaults to ``1000``.
        :type replication_factor: integer

        :return: returns a list of two items, that contains:

            - :p_value: P-value of MGC
            - :metadata: a ``dict`` of metadata with the following keys:
                    - :null_distribution: numpy array representing distribution of test statistic under null.
        :rtype: list

        **Example:**

        >>> import numpy as np
        >>> from mgcpy.independence_tests.mgc.mgc_ts import MGC_TS
        >>>
        >>> X = np.array([0.07487683, -0.18073412, 0.37266440, 0.06074847, 0.76899045,
        ...           0.51862516, -0.13480764, -0.54368083, -0.73812644, 0.54910974]).reshape(-1, 1)
        >>> Y = np.array([-1.31741173, -0.41634224, 2.24021815, 0.88317196, 2.00149312,
        ...           1.35857623, -0.06729464, 0.16168344, -0.61048226, 0.41711113]).reshape(-1, 1)
        >>> mgc_ts = MGC_TS()
        >>> p_value, metadata = mgc_ts.p_value(X, Y, replication_factor = 100)
        """
        assert matrix_X.shape[0] == matrix_Y.shape[0], "Matrices X and Y need to be of dimensions [n, p] and [n, q], respectively, where p can be equal to q"

        # Compute test statistic
        n = matrix_X.shape[0]
        if len(matrix_X.shape) == 1:
            matrix_X = matrix_X.reshape((n, 1))
        if len(matrix_Y.shape) == 1:
            matrix_Y = matrix_Y.reshape((n, 1))
        matrix_X, matrix_Y = compute_distance(matrix_X, matrix_Y, self.compute_distance_matrix)
        test_statistic, test_statistic_metadata = self.test_statistic(matrix_X, matrix_Y)

        # Block bootstrap
        block_size = int(np.ceil(np.sqrt(n)))
        test_stats_null = np.zeros(replication_factor)
        for rep in range(replication_factor):
            # Generate new time series sample for Y
            permuted_indices = np.r_[[np.arange(t, t + block_size) for t in np.random.choice(n, n // block_size + 1)]].flatten()[:n]
            permuted_indices = np.mod(permuted_indices, n)
            permuted_Y = matrix_Y[np.ix_(permuted_indices, permuted_indices)]

            # Compute test statistic
            test_stats_null[rep], _ = self.test_statistic(matrix_X, permuted_Y)

        self.p_value_ = np.sum(np.greater(test_stats_null, test_statistic)) / replication_factor
        if self.p_value == 0.0:
            self.p_value = 1 / replication_factor
        self.p_value_metadata_ = {'null_distribution': test_stats_null}

        return self.p_value_, self.p_value_metadata_