Source code for quara.objects.multinomial_distribution

from functools import reduce
from operator import mul
from typing import List, Tuple, Union

import numpy as np
from scipy.stats import multinomial

from quara.math.probability import validate_prob_dist
from quara.utils.index_util import index_serial_from_index_multi_dimensional
from quara.utils.number_util import to_stream


[docs]class MultinomialDistribution: def __init__( self, ps: np.ndarray, shape: Tuple[int] = None, eps_zero: float = None ): """Constructor Parameters ---------- ps : np.ndarray the probability distribution of multinomial distribution. shape : Tuple[int], optional the shape of multinomial distribution, by default None. if shape is None, set len(ps). eps_zero : float, optional threshold to determine probabilities as zero, by default 1e-8 Raises ------ ValueError some elements of ps are negative numbers. ValueError the sum of ps is not 1. ValueError the size of ps and shape do not match. """ if type(ps) == list: ps = np.array(ps) # validation about probabiity validate_prob_dist(ps, validate_sum=False) if shape is None: self._shape = (len(ps),) else: # validation about data size if len(ps) != reduce(mul, shape): raise ValueError( f"the size of ps({len(ps)}) and shape({shape}) do not match." ) self._shape = shape self._ps = ps self._eps_zero = eps_zero if eps_zero else 1e-8 # adjust probability distribution self._is_zero_dist = True has_zero = False for index, prob in enumerate(self.ps): if prob < self.eps_zero: self._ps[index] = 0.0 has_zero = True else: self._is_zero_dist = False # normalize when is_zero_dist = False and ps has zero if self.is_zero_dist == False and has_zero == True: self._ps = self._ps / np.sum(self._ps) # validation about probabiity if self.is_zero_dist == False: validate_prob_dist(self.ps, validate_sum=True) @property # read only def ps(self) -> np.ndarray: """returns ps. Returns ------- np.ndarray the probability distribution of multinomial distribution. """ return self._ps @property # read only def shape(self) -> Tuple[int]: """returns shape. Returns ------- Tuple[int] the shape of multinomial distribution. """ return self._shape @property # read only def eps_zero(self) -> float: """returns threshold to determine probabilities as zero. Returns ------- float threshold to determine probabilities as zero. """ return self._eps_zero @property # read only def is_zero_dist(self) -> bool: """returns whether all probabilities are zero. Returns ------- bool whether all probabilities are zero. """ return self._is_zero_dist def __getitem__(self, idx) -> np.float64: # Working in progress if type(idx) == int: # One-dimensional access # ex) prob_dist[0] return self._ps[idx] elif type(idx) == tuple: # Multidimensional access # ex) prob_dist[(0, 1)] serial_index = index_serial_from_index_multi_dimensional(self._shape, idx) return self._ps[serial_index] else: raise TypeError("unsupported type {type(idx)}") def __str__(self): desc = f"shape = {self.shape}\n" desc += f"ps = {self._ps}" return desc
[docs] def marginalize( self, outcome_indices_remain: List[int] ) -> "MultinomialDistribution": """marginalize MultinomialDistribution. Parameters ---------- outcome_indices_remain : List[int] calculate the marginal probability of variable ``outcome_indices_remain``. Returns ------- MultinomialDistribution marginalized MultinomialDistribution. Raises ------ ValueError some elements of outcome_indices_remain are out of range. """ # axis of marginalization axis = set(range(len(self.shape))) for index in outcome_indices_remain: if index < 0 or len(self.shape) <= index: raise ValueError( f"some elements of outcome_indices_remain are out of range({index})." ) axis.remove(index) axis = tuple(axis) # marginalize by np.sum marginalized = np.sum(self.ps.reshape(self.shape), axis=axis) new_dist = MultinomialDistribution(marginalized.flatten(), marginalized.shape) return new_dist
[docs] def conditionalize( self, conditional_variable_indices: List[int], conditional_variable_values: List[int], ) -> "MultinomialDistribution": """conditionalize MultinomialDistribution. Parameters ---------- conditional_variable_indices : List[int] indices of the given event of the conditional probability. conditional_variable_values : List[int] values of the given event of the conditional probability. Returns ------- MultinomialDistribution conditionalized MultinomialDistribution. Raises ------ ValueError length of conditional_variable_indices does not equal conditional_variable_values. ValueError conditional_variable_indices has negative numbers. ValueError conditional_variable_values has negative numbers. """ ### validate if len(conditional_variable_indices) != len(conditional_variable_values): raise ValueError( "length of conditional_variable_indices must equal conditional_variable_values." + f" length of conditional_variable_indices is {len(conditional_variable_indices)}." + f" length of conditional_variable_values is {len(conditional_variable_values)}." ) for val in conditional_variable_indices: if val < 0: raise ValueError( "conditional_variable_indices consists of non-negative numbers." + f" conditional_variable_indices={conditional_variable_indices}" ) for val in conditional_variable_values: if val < 0: raise ValueError( "conditional_variable_values consists of non-negative numbers." + f" conditional_variable_values={conditional_variable_values}" ) ### calc new ps # to extract specific columns from old ps, calculate ixgrid of numpy. num_variable = len(self.shape) ix_args = [] for num_var in self.shape: arg = [True] * num_var ix_args.append(arg) for var_index, var_value in zip( conditional_variable_indices, conditional_variable_values ): arg = [False] * self.shape[var_index] arg[var_value] = True ix_args[var_index] = arg ixgrid = np.ix_(*ix_args) # extract specific columns from old ps new_ps = self.ps.reshape(self.shape)[ixgrid] new_ps = new_ps.flatten() / np.sum(new_ps) ### calc new shape new_shape_indices = [True] * num_variable for var_index in conditional_variable_indices: new_shape_indices[var_index] = False new_shape = np.array(self.shape)[new_shape_indices] new_shape = tuple(new_shape) new_dist = MultinomialDistribution(new_ps, new_shape) return new_dist
[docs] def execute_random_sampling( self, num: int, size: int, random_generator: Union[int, np.random.Generator] = None, ) -> List[np.ndarray]: """execute random sampling. Parameters ---------- num : int number of trials size : int size of trials. this size equals the length of the list returned by this function. random_generator : Union[int, np.random.Generator], optional - If the type is int, generates Generator with seed `seed_or_generator` and returned generated random.Generator. - If the type is Generator, returns Generator. - If argument is None, returns np.random. - Default value is None. Returns ------- List[np.ndarray] list of random sampling. """ stream = to_stream(random_generator) samplings = list(multinomial.rvs(num, self.ps, size=size, random_state=stream)) return samplings