Source code for pyllars.matrix_utils

""" Helpers for working with (sparse) 2d matrices
"""
import logging
logger = logging.getLogger(__name__)

import numpy as np

import scipy.io
import scipy.sparse

from typing import List, Optional, Tuple

###
# Matrix operation helpers
###

[docs]def col_op(m, op): """Apply op to each column in the matrix.""" return op(m, axis=0)
[docs]def col_sum(m): """Calculate the sum of each column in the matrix.""" return np.sum(m, axis=0)
[docs]def col_sum_mean(m:np.ndarray, return_var:bool=False) -> float: """ Calculate the mean of the sum of each column in the matrix. Optionally, the variances of the column sums can also be calculated. Parameters ---------- m : numpy.ndarray The (2d) matrix var : bool Whether to calculate the variances Returns ------- mean : float The mean of the column sums in the matrix variance : float If `return_var` is True, then the variance of the column sums """ col_sums = col_sum(m) mean = np.mean(col_sums) ret = mean if return_var: var = np.var(col_sums) ret = mean, var return ret
[docs]def normalize_columns(matrix:np.ndarray) -> np.ndarray: """ Normalize the columns of the given (dense) matrix Parameters ---------- m : numpy.ndarray The (2d) matrix Returns ------- normalized_matrix : numpy.ndarray The matrix normalized such that all column sums are 1 """ col_sums = np.sum(np.abs(matrix), axis=0) matrix = np.divide(m, col_sums) return matrix
[docs]def row_op(m, op): """Apply op to each row in the matrix.""" return op(m, axis=1)
[docs]def row_sum(m): """Calculate the sum of each row in the matrix.""" return np.sum(m, axis=1)
[docs]def row_sum_mean(m:np.ndarray, var:bool=False) -> float: """ Calculate the mean of the sum of each row in the matrix. Optionally, the variances of the row sums can also be calculated. Parameters ---------- m : numpy.ndarray The (2d) matrix return_var : bool Whether to calculate the variances Returns ------- mean : float The mean of the row sums in the matrix variance : float If `return_var` is `True`, then the variance of the row sums """ row_sums = row_sum(m) mean = np.mean(row_sums) ret = mean if return_var: var = np.var(row_sums) ret = mean, var return ret
[docs]def normalize_rows(matrix:np.ndarray) -> np.ndarray: """ Normalize the rows of the given (dense) matrix Parameters ---------- matrix : numpy.ndarray The (2d) matrix Returns ------- normalized_matrix : numpy.ndarray The matrix normalized such that all row sums are 1 """ coef = np.abs(matrix).sum(axis=1) coef = coef[:,np.newaxis] matrix = np.divide(matrix, coef) return matrix
### # Sparse matrix helpers ###
[docs]def get_dense_row( matrix:scipy.sparse.spmatrix, row:int, dtype=float, max_length:Optional[int]=None) -> np.ndarray: """ Extract `row` from the sparse `matrix` Parameters ---------- matrix : scipy.sparse.spmatrix The sparse matrix row : int The 0-based row index dtype : type The base type of elements of `matrix`. This is used for the corner case where `matrix` is essentially a sparse column vector. max_length : typing.Optional[int] The maximum number of columns to include in the returned row. Returns ------- row : numpy.ndarray The specified row (as a 1d numpy array) """ d = matrix.getrow(row).todense() d = np.squeeze(np.asarray(d, dtype=dtype)) if max_length is not None: d = d[:max_length] # make sure we do not return a scalar if isinstance(d, dtype): d = np.array([d]) return d
[docs]def sparse_matrix_to_dense(sparse_matrix:scipy.sparse.spmatrix) -> np.ndarray: """ Convert `sparse_matrix` to a dense numpy array Parameters ---------- sparse_matrix : scipy.sparse.spmatrix The sparse scipy matrix Returns ------- dense_matrix: numpy.ndarray The dense (2d) numpy array """ dense = np.array(sparse_matrix.todense()) return dense
[docs]def sparse_matrix_to_list(sparse_matrix:scipy.sparse.spmatrix) -> List: """ Convert `sparse_matrix` to a list of "sparse row vectors". In this context, a "sparse row vector" is simply a sparse matrix with dimensionality (1, sparse_matrix.shape[1]). Parameters ---------- sparse_matrix: scipy.sparse.spmatrix The sparse scipy matrix Returns ------- list_of_sparse_row_vectors : typing.List[scipy.sparse.spmatrix] The list of sparse row vectors """ list_of_sparse_row_vectors = [ sparse_matrix[i] for i in range(sparse_matrix.shape[0]) ] return list_of_sparse_row_vectors
[docs]def write_sparse_matrix( target:str, a:scipy.sparse.spmatrix, compress:bool=True, **kwargs) -> None: """ Write `a` to the file `target` in matrix market format This function is a drop-in replacement for scipy.io.mmwrite. The only difference is that it gzip compresses the output by default. It *does not* alter the file extension, which should likely end in "mtx.gz" except in special circumstances. If `compress` is `True`, then this function imports `gzip`. Parameters ---------- target : str The complete path to the output file, including file extension a : scipy.sparse.spmatrix The sparse matrix compress : bool Whether to compress the output **kwargs : <key>=<value> pairs These are passed through to :func:`scipy.io.mmwrite`. Please see the scipy documentation for more details. Returns -------- None, but the matrix is written to disk """ if compress: import gzip with gzip.open(target, 'wb') as target_gz: scipy.io.mmwrite(target_gz, a, **kwargs) else: scipy.io.mmwrite(target, a, **kwargs)
### # Other helpers ###
[docs]def matrix_multiply( m1:np.ndarray, m2:np.ndarray, m3:np.ndarray) -> np.ndarray: """ Multiply the three matrices This function performs the multiplications in an order such that the size of the intermediate matrix created by the first matrix multiplication is as small as possible. Parameters ---------- m{1,2,3} : numpy.ndarray The (2d) matrices Returns ------- product_matrix : numpy.ndarray The product of the three input matrices """ # check the dimensions if m1.shape[1] != m2.shape[0]: msg = ("The inner dimension between the first and second matrices do " "not match. {} and {}".format(m1.shape, m2.shape)) raise ValueError(msg) if m2.shape[1] != m3.shape[0]: msg = ("The inner dimensions between the second and third matrices do " "not match. {} and {}".format(m2.shape, m3.shape)) raise ValueError(msg) # now, check which order to perform the multiplications m1_first_size = m1.shape[0] * m2.shape[1] m3_first_size = m2.shape[0] * m3.shape[1] if m1_first_size < m3_first_size: res = np.dot(m1, m2) res = np.dot(res, m3) else: res = np.dot(m2, m3) res = np.dot(m1, res) return res
[docs]def permute_matrix( m:np.ndarray, is_flat:bool=False, shape:Optional[Tuple[int]]=None) -> np.ndarray: """ Randomly permute the entries of the matrix. The matrix is first flattened. For reproducibility, the random seed of numpy should be set **before** calling this function. Parameters ---------- m : numpy.ndarray The matrix (tensor, etc.) is_flat : bool Whether the matrix values have already been flattened. If they have been, then the desired shape must be passed. shape : typing.Optional[typing.Tuple] The shape of the output matrix, if `m` is already flattened Returns ------- permuted_m: numpy.ndarray A copy of m (with the same shape as m) with the values randomly permuted. """ if shape is None: shape = m.shape if not is_flat: m = np.reshape(m, -1) # shuffle the actual values permuted_m = np.random.permutation(m) # and put them back in the correct shape permuted_m = np.reshape(permuted_m, shape) return permuted_m