import numpy as np
from typing import List, Tuple
from quara.settings import Settings
def _random_number_to_data(probdist: np.ndarray, random_number: np.float64) -> int:
cumulative_sum = 0.0
for index, prob in enumerate(probdist):
cumulative_sum += prob
if random_number < cumulative_sum:
return index
return len(probdist) - 1
[docs]def generate_data_from_prob_dist(
prob_dist: np.ndarray, data_num: int, seed: int = None, atol: float = None
) -> List[int]:
"""generates random data from a probability distribution.
the data is a sequence (list) of measurement outcomes.
measurement outcomes are integers.
``0 <= each measurement outcomes < len(probdist)``.
length of the data equals ``data_num``.
Parameters
----------
prob_dist : np.ndarray
a probability distribution used to generate random data.
data_num : int
length of the data.
seed : int, optional
a seed used to generate random data, by default None.
atol : float, optional
the absolute tolerance parameter, uses :func:`~quara.settings.Settings.get_atol` by default.
checks ``absolute(the sum of probabilities - 1) <= atol`` in this function.
Returns
-------
List[int]
generated data.
Raises
------
ValueError
each probability is not a positive number.
ValueError
the sum of probabilities does not equal 1.
"""
# whether each probability is a positive number.
for prob in prob_dist:
if prob < 0:
raise ValueError(
f"each probability must be a positive number. there is {prob} in a probability distribution"
)
# whether the sum of probabilities equals 1.
sum_prob_dist = np.sum(prob_dist)
atol = atol if atol else Settings.get_atol()
if not np.isclose(sum_prob_dist, 1, atol=atol, rtol=0.0):
raise ValueError(
f"the sum of probabilities must equal 1. the sum of probabilities is {np.sum(prob_dist)}"
)
if seed is not None:
np.random.seed(seed)
# generate random numbers. 0 <= rand_val[i] < 1 for all i = 0,..., num_data - 1
rand_val = np.random.rand(data_num)
# use np.frompyfunc to apply the function '_random_number_to_data' to np.ndarray
def curried_random_number_to_data(random_number):
return _random_number_to_data(prob_dist, random_number)
_random_number_to_data_func = np.frompyfunc(curried_random_number_to_data, 1, 1)
return _random_number_to_data_func(rand_val).tolist()
[docs]def generate_dataset_from_prob_dists(
prob_dists: List[np.ndarray],
data_nums: List[int],
seeds: List[int] = None,
) -> List[List[int]]:
"""generates random dataset from probability distributions.
the dataset is a list of data generated by :func:`~quara.qcircuit.data_generator.generate_data_from_probdist`
Parameters
----------
prob_dists : List[np.ndarray]
a list of probdist.
data_nums : List[int]
a list of data_num.
seeds : List[int], optional
a list of seed, by default None
Returns
-------
List[List[int]]
generated dataset.
Raises
------
ValueError
the length of ``prob_dists`` does not equal the length of ``data_nums``.
ValueError
``seeds`` is not None and the length of ``prob_dists`` does not equal the length of ``seeds``.
"""
# whether the length of prob_dists equals the length of data_nums.
if len(prob_dists) != len(data_nums):
raise ValueError(
f"the length of prob_dists must equal the length of data_nums. the length of prob_dists is {len(prob_dists)}. the length of data_nums is {len(data_nums)}"
)
# whether the length of prob_dists equals the length of seeds.
if seeds is not None:
if len(prob_dists) != len(seeds):
raise ValueError(
f"the length of prob_dists must equal the length of seeds. the length of prob_dists is {len(prob_dists)}. the length of seeds is {len(seeds)}"
)
dataset = []
for index, (prob_dist, data_num) in enumerate(zip(prob_dists, data_nums)):
seed = None if seeds is None else seeds[index]
data = generate_data_from_prob_dist(prob_dist, data_num, seed)
dataset.append(data)
return dataset
[docs]def calc_empi_dist_sequence(
measurement_num: int, data: List[int], num_sums: List[int]
) -> List[Tuple[int, np.ndarray]]:
"""calculates empirical distributions.
uses ``data`` from 0-th to ``num_sums[index]``-th to calculate empirical distributions.
Parameters
----------
measurement_num : int
number of measurements.
data : List[int]
data of measurement outcomes.
num_sums : List[int]
a list of the range of ``data`` to calculate empirical distributions.
Returns
-------
List[Tuple[int, np.ndarray]]
a list of (the range of ``data``, empirical distribution).
the dtype of each empirical distribution is np.float64.
Raises
------
ValueError
``measurement_num`` is not non-negative integer.
ValueError
there is an element of ``num_sums`` that is not less than or equal to length of ``data``.
ValueError
there is an element of ``data`` that is not non-negative and less than ``measurement_num``.
ValueError
``num_sums`` is not an increasing sequence.
Examples
--------
>>> measurement_num = 2
>>> data = [1, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1]
>>> num_sums = [5, 10, 20]
>>> empi_dist = calc_empi_dist_sequence(measurement_num, data, num_sums)
>>> empi_dist
[(5, array([0.4, 0.6])), (10, array([0.3, 0.7])), (20, array([0.3, 0.7]))]
"""
# whether measurement_num is non-negative integer.
if measurement_num < 0:
raise ValueError(
f"measurement_num must be non-negative integer. measurement_num is {measurement_num}"
)
empi_dists = []
cumulative_frequency = np.zeros((measurement_num), dtype=int)
# take next num_sum from 'num_sums'
if len(num_sums) == 0:
return empi_dists
next_num_sum = num_sums[0]
next_num_sum_position = 0
former_num_sum = 0
# whether each number of num_sums is less than or equal to length of data.
if next_num_sum > len(data):
raise ValueError(
f"each number of num_sums must be less than or equal to length of data. num_sums of index {next_num_sum_position} is {next_num_sum}. length of data is {len(data)}"
)
for index, d in enumerate(data):
# whether 0 <= d < 'measurement_num'.
if not 0 <= d < measurement_num:
raise ValueError(
f"for each data d, it must be 0 <= d < 'measurement_num'. data of index {index} is {d}"
)
cumulative_frequency[d] += 1
# calculate empirical distribution
if index + 1 == next_num_sum:
empidist = cumulative_frequency / (index + 1)
empi_dists.append((next_num_sum, empidist))
# take next num_sum from 'num_sums'
if next_num_sum_position + 1 == len(num_sums):
# end of 'num_sums'
return empi_dists
else:
former_num_sum = next_num_sum
next_num_sum_position += 1
next_num_sum = num_sums[next_num_sum_position]
# whether each number of num_sums is less than or equal to length of data.
if next_num_sum > len(data):
raise ValueError(
f"each number of num_sums must be less than or equal to length of data. num_sums of index {next_num_sum_position} is {next_num_sum}"
)
# whether num_sums must be an increasing sequence.
if former_num_sum >= next_num_sum:
raise ValueError(
f"num_sums must be an increasing sequence. num_sums contains the following subsequence: {former_num_sum}, {next_num_sum}"
)
return empi_dists
[docs]def calc_empi_dists_sequence(
measurement_nums: List[int],
dataset: List[List[int]],
list_num_sums: List[List[int]],
) -> List[List[Tuple[int, np.ndarray]]]:
"""calculates a sequence of empirical distributions by :func:`~quara.qcircuit.data_generator.calc_empidist`.
Parameters
----------
measurement_nums : List[int]
a list of measurement_num
dataset : List[List[int]]
a dataset
list_num_sums : List[List[int]]
a list of num_sums
Returns
-------
List[List[np.ndarray]]
a sequence of empirical distributions.
Raises
------
ValueError
the length of ``measurement_nums`` does not equal the length of ``dataset``.
ValueError
the length of ``measurement_nums`` does not equal the length of ``list_llist_num_sumsist_num_sum``.
"""
# whether the length of measurement_nums equals the length of dataset.
if len(measurement_nums) != len(dataset):
raise ValueError(
f"the length of measurement_nums must equal the length of dataset. the length of measurement_nums is {len(measurement_nums)}. the length of dataset is {len(dataset)}"
)
# whether the length of measurement_nums equals the length of list_num_sums.
if len(measurement_nums) != len(list_num_sums):
raise ValueError(
f"the length of measurement_nums must equal the length of list_num_sums. the length of measurement_nums is {len(measurement_nums)}. the length of list_num_sums is {len(list_num_sums)}"
)
empi_dists_sequence = []
for measurement_num, data, num_sums in zip(
measurement_nums, dataset, list_num_sums
):
empi_dists = calc_empi_dist_sequence(measurement_num, data, num_sums)
empi_dists_sequence.append(empi_dists)
return empi_dists_sequence