import numpy as np
from typing import List, Tuple, Union
from quara.settings import Settings
from quara.utils.number_util import to_stream
def _random_number_to_data(probdist: np.ndarray, random_number: np.float64) -> int:
cumulative_sum = 0.0
for index, prob in enumerate(probdist):
cumulative_sum += prob
if random_number < cumulative_sum:
return index
return len(probdist) - 1
[docs]def generate_data_from_prob_dist(
prob_dist: np.ndarray,
data_num: int,
seed_or_stream: Union[int, np.random.RandomState] = None,
atol: float = None,
) -> List[int]:
"""generates random data from a probability distribution.
the data is a sequence (list) of measurement outcomes.
measurement outcomes are integers.
``0 <= each measurement outcomes < len(probdist)``.
length of the data equals ``data_num``.
Parameters
----------
prob_dist : np.ndarray
a probability distribution used to generate random data.
data_num : int
length of the data.
seed_or_stream : Union[int, np.random.RandomState], optional
If the type is int, it is assumed to be a seed used to generate random data.
If the type is RandomState, it is used to generate random data.
If argument is None, np.random is used to generate random data.
Default value is None.
atol : float, optional
the absolute tolerance parameter, uses :func:`~quara.settings.Settings.get_atol` by default.
checks ``absolute(the sum of probabilities - 1) <= atol`` in this function.
Returns
-------
List[int]
generated data.
Raises
------
ValueError
each probability is not a positive number.
ValueError
the sum of probabilities does not equal 1.
"""
# whether each probability is a positive number.
for prob in prob_dist:
if prob < 0:
raise ValueError(
f"each probability must be a positive number. there is {prob} in a probability distribution"
)
# whether the sum of probabilities equals 1.
sum_prob_dist = np.sum(prob_dist)
atol = atol if atol else Settings.get_atol()
if not np.isclose(sum_prob_dist, 1, atol=atol, rtol=0.0):
raise ValueError(
f"the sum of probabilities must equal 1. the sum of probabilities is {np.sum(prob_dist)}"
)
# generate random numbers. 0 <= rand_val[i] < 1 for all i = 0,..., num_data - 1
stream = to_stream(seed_or_stream)
rand_val = stream.rand(data_num)
# use np.frompyfunc to apply the function '_random_number_to_data' to np.ndarray
def curried_random_number_to_data(random_number):
return _random_number_to_data(prob_dist, random_number)
_random_number_to_data_func = np.frompyfunc(curried_random_number_to_data, 1, 1)
return _random_number_to_data_func(rand_val).tolist()
[docs]def generate_dataset_from_prob_dists(
prob_dists: List[np.ndarray],
data_nums: List[int],
seeds_or_streams: List[Union[int, np.random.RandomState]] = None,
) -> List[List[int]]:
"""generates random dataset from probability distributions.
the dataset is a list of data generated by :func:`~quara.qcircuit.data_generator.generate_data_from_probdist`
Parameters
----------
prob_dists : List[np.ndarray]
a list of probdist.
data_nums : List[int]
a list of data_num.
seeds_or_streams : Union[int, np.random.RandomState], optional
If the type is int, generates RandomState with seed `seed_or_stream` and returned generated RandomState.
If the type is RandomState, returns RandomState.
If argument is None, returns np.random.
Default value is None.
Returns
-------
List[List[int]]
generated dataset.
Raises
------
ValueError
the length of ``prob_dists`` does not equal the length of ``data_nums``.
ValueError
``seeds_or_streams`` is not None and the length of ``prob_dists`` does not equal the length of ``seeds_or_streams``.
"""
# whether the length of prob_dists equals the length of data_nums.
if len(prob_dists) != len(data_nums):
raise ValueError(
f"the length of prob_dists must equal the length of data_nums. the length of prob_dists is {len(prob_dists)}. the length of data_nums is {len(data_nums)}"
)
# whether the length of prob_dists equals the length of seeds_or_streams.
if seeds_or_streams is not None:
if len(prob_dists) != len(seeds_or_streams):
raise ValueError(
f"the length of prob_dists must equal the length of seeds_or_streams. the length of prob_dists is {len(prob_dists)}. the length of seeds_or_streams is {len(seeds_or_streams)}"
)
dataset = []
for index, (prob_dist, data_num) in enumerate(zip(prob_dists, data_nums)):
seed_or_stream = None if seeds_or_streams is None else seeds_or_streams[index]
data = generate_data_from_prob_dist(prob_dist, data_num, seed_or_stream)
dataset.append(data)
return dataset
[docs]def calc_empi_dist_sequence(
measurement_num: int, data: List[int], num_sums: List[int]
) -> List[Tuple[int, np.ndarray]]:
"""calculates empirical distributions.
uses ``data`` from 0-th to ``num_sums[index]``-th to calculate empirical distributions.
Parameters
----------
measurement_num : int
number of measurements.
data : List[int]
data of measurement outcomes.
num_sums : List[int]
a list of the range of ``data`` to calculate empirical distributions.
Returns
-------
List[Tuple[int, np.ndarray]]
a list of (the range of ``data``, empirical distribution).
the dtype of each empirical distribution is np.float64.
Raises
------
ValueError
``measurement_num`` is not non-negative integer.
ValueError
there is an element of ``num_sums`` that is not less than or equal to length of ``data``.
ValueError
there is an element of ``data`` that is not non-negative and less than ``measurement_num``.
ValueError
``num_sums`` is not an increasing sequence.
Examples
--------
>>> measurement_num = 2
>>> data = [1, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1]
>>> num_sums = [5, 10, 20]
>>> empi_dist = calc_empi_dist_sequence(measurement_num, data, num_sums)
>>> empi_dist
[(5, array([0.4, 0.6])), (10, array([0.3, 0.7])), (20, array([0.3, 0.7]))]
"""
# whether measurement_num is non-negative integer.
if measurement_num < 0:
raise ValueError(
f"measurement_num must be non-negative integer. measurement_num is {measurement_num}"
)
empi_dists = []
cumulative_frequency = np.zeros((measurement_num), dtype=int)
# take next num_sum from 'num_sums'
if len(num_sums) == 0:
return empi_dists
next_num_sum = num_sums[0]
next_num_sum_position = 0
former_num_sum = 0
# whether each number of num_sums is less than or equal to length of data.
if next_num_sum > len(data):
raise ValueError(
f"each number of num_sums must be less than or equal to length of data. num_sums of index {next_num_sum_position} is {next_num_sum}. length of data is {len(data)}"
)
for index, d in enumerate(data):
# whether 0 <= d < 'measurement_num'.
if not 0 <= d < measurement_num:
raise ValueError(
f"for each data d, it must be 0 <= d < 'measurement_num'. data of index {index} is {d}"
)
cumulative_frequency[d] += 1
# calculate empirical distribution
if index + 1 == next_num_sum:
empidist = cumulative_frequency / (index + 1)
empi_dists.append((next_num_sum, empidist))
# take next num_sum from 'num_sums'
if next_num_sum_position + 1 == len(num_sums):
# end of 'num_sums'
return empi_dists
else:
former_num_sum = next_num_sum
next_num_sum_position += 1
next_num_sum = num_sums[next_num_sum_position]
# whether each number of num_sums is less than or equal to length of data.
if next_num_sum > len(data):
raise ValueError(
f"each number of num_sums must be less than or equal to length of data. num_sums of index {next_num_sum_position} is {next_num_sum}"
)
# whether num_sums must be an increasing sequence.
if former_num_sum >= next_num_sum:
raise ValueError(
f"num_sums must be an increasing sequence. num_sums contains the following subsequence: {former_num_sum}, {next_num_sum}"
)
return empi_dists
[docs]def calc_empi_dists_sequence(
measurement_nums: List[int],
dataset: List[List[int]],
list_num_sums: List[List[int]],
) -> List[List[Tuple[int, np.ndarray]]]:
"""calculates a sequence of empirical distributions by :func:`~quara.qcircuit.data_generator.calc_empidist`.
Parameters
----------
measurement_nums : List[int]
a list of measurement_num
dataset : List[List[int]]
a dataset
list_num_sums : List[List[int]]
a list of num_sums
Returns
-------
List[List[np.ndarray]]
a sequence of empirical distributions.
Raises
------
ValueError
the length of ``measurement_nums`` does not equal the length of ``dataset``.
ValueError
the length of ``measurement_nums`` does not equal the length of ``list_llist_num_sumsist_num_sum``.
"""
# whether the length of measurement_nums equals the length of dataset.
if len(measurement_nums) != len(dataset):
raise ValueError(
f"the length of measurement_nums must equal the length of dataset. the length of measurement_nums is {len(measurement_nums)}. the length of dataset is {len(dataset)}"
)
# whether the length of measurement_nums equals the length of list_num_sums.
if len(measurement_nums) != len(list_num_sums):
raise ValueError(
f"the length of measurement_nums must equal the length of list_num_sums. the length of measurement_nums is {len(measurement_nums)}. the length of list_num_sums is {len(list_num_sums)}"
)
empi_dists_sequence = []
for measurement_num, data, num_sums in zip(
measurement_nums, dataset, list_num_sums
):
empi_dists = calc_empi_dist_sequence(measurement_num, data, num_sums)
empi_dists_sequence.append(empi_dists)
return empi_dists_sequence