import itertools
from itertools import product
from typing import List, Tuple, Union
import numpy as np
from scipy.linalg import block_diag
from quara.objects.state import State
from quara.objects.povm import Povm
from quara.objects.mprocess import MProcess
from quara.objects.qoperation import QOperation
from quara.objects.qoperations import SetQOperations
from quara.protocol.qtomography.standard.standard_qtomography import StandardQTomography
from quara.protocol.qtomography.standard.standard_qpt import calc_c_qpt
from quara.qcircuit.experiment import Experiment
from quara.utils.number_util import to_stream
[docs]class StandardQmpt(StandardQTomography):
_estimated_qoperation_type = MProcess
def __init__(
self,
states: List[State],
povms: List[Povm],
num_outcomes: int,
is_physicality_required: bool = False,
is_estimation_object: bool = False,
on_para_eq_constraint: bool = False,
eps_proj_physical: float = None,
eps_truncate_imaginary_part: float = None,
seed_data: int = None,
schedules: Union[str, List[List[Tuple]]] = "all",
):
# Make Experment with states
if type(schedules) == str:
self._validate_schedules_str(schedules)
if schedules == "all":
schedules = []
for i, j in product(range(len(states)), range(len(povms))):
schedules.append([("state", i), ("mprocess", 0), ("povm", j)])
experiment = Experiment(
states=states,
mprocesses=[None],
povms=povms,
schedules=schedules,
seed_data=seed_data,
)
self._validate_schedules(schedules)
self._num_outcomes = num_outcomes
# Make SetQOperation
size = states[0].dim ** 2
hss = [np.zeros((size, size), dtype=np.float64) for _ in range(num_outcomes)]
mprocess = MProcess(
c_sys=states[0].composite_system,
hss=hss,
is_physicality_required=is_physicality_required,
is_estimation_object=is_estimation_object,
on_para_eq_constraint=on_para_eq_constraint,
eps_proj_physical=eps_proj_physical,
eps_truncate_imaginary_part=eps_truncate_imaginary_part,
)
set_qoperations = SetQOperations(states=[], mprocesses=[mprocess], povms=[])
super().__init__(experiment, set_qoperations)
# validate
if not self.is_valid_experiment():
raise ValueError(
"the experiment is not valid. all CompositeSystem of testers must have same ElementalSystems."
)
if on_para_eq_constraint:
self._num_variables = num_outcomes * mprocess.dim ** 4 - mprocess.dim ** 2
else:
self._num_variables = num_outcomes * mprocess.dim ** 4
# create map
self._map_experiment_to_setqoperations = {("mprocess", 0): ("mprocess", 0)}
self._map_setqoperations_to_experiment = {("mprocess", 0): ("mprocess", 0)}
# calc and set coeff0s, coeff1s, matA and vecB
self._set_coeffs(experiment, on_para_eq_constraint)
self._on_para_eq_constraint = on_para_eq_constraint
self._template_qoperation = self._set_qoperations.mprocesses[0]
def _validate_schedules(self, schedules):
for i, schedule in enumerate(schedules):
if (
schedule[0][0] != "state"
or schedule[1][0] != "mprocess"
or schedule[2][0] != "povm"
):
message = f"schedules[{i}] is invalid. "
message += 'Schedule of Qmpt must be in format as \'[("state", state_index), ("mprocess", 0), ("povm", povm_index)]\', '
message += f"not '{schedule}'."
raise ValueError(message)
if schedule[1][1] != 0:
message = f"schedules[{i}] is invalid."
message += f"MProcess index of schedule in Qmpt must be 0: {schedule}"
raise ValueError(message)
@property
def on_para_eq_constraint(self): # read only
return self._on_para_eq_constraint
@property
def num_outcomes_estimate(self):
return self._num_outcomes
[docs] def num_outcomes(self, schedule_index: int) -> int:
"""returns the number of outcomes of probability distribution of a schedule index.
Parameters
----------
schedule_index: int
Returns
-------
int
the number of outcomes
"""
assert schedule_index >= 0
assert schedule_index < self.num_schedules
povm_index = self._experiment.schedules[schedule_index][2][1]
num_outcomes_povm = len(self._experiment._povms[povm_index].vecs)
num_outcomes_mprocess = self._num_outcomes
return num_outcomes_povm * num_outcomes_mprocess
[docs] def estimation_object_type(self) -> type:
return MProcess
[docs] def is_valid_experiment(self) -> bool:
is_ok_states = self.is_all_same_composite_systems(self._experiment.states)
is_ok_povms = self.is_all_same_composite_systems(self._experiment.povms)
return is_ok_states and is_ok_povms
[docs] def generate_empi_dist(
self,
schedule_index: int,
mprocess: MProcess,
num_sum: int,
seed_or_generator: Union[int, np.random.Generator] = None,
) -> Tuple[int, np.ndarray]:
"""Generate empirical distribution using the data generated from probability distribution of specified schedules.
Parameters
----------
schedule_index : int
schedule index.
mprocess: MProcess
true object.
num_sum : int
the number of data to use to generate the experience distributions for each schedule.
seed_or_generator : Union[int, np.random.Generator], optional
If the type is int, it is assumed to be a seed used to generate random data.
If the type is Generator, it is used to generate random data.
If argument is None, np.random is used to generate random data.
Default value is None.
Returns
-------
Tuple[int, np.ndarray]
Generated empirical distribution.
"""
tmp_experiment = self._experiment.copy()
target_index = self._get_target_index(tmp_experiment, schedule_index)
tmp_experiment.mprocesses[target_index] = mprocess
stream = to_stream(seed_or_generator)
empi_dist_seq = tmp_experiment.generate_empi_dist_sequence(
schedule_index, [num_sum], seed_or_generator=stream
)
return empi_dist_seq[0]
[docs] def generate_empi_dists_sequence(
self,
mprocess: MProcess,
num_sums: List[int],
seed_or_generator: Union[int, np.random.Generator] = None,
) -> List[List[Tuple[int, np.ndarray]]]:
tmp_experiment = self._experiment.copy()
list_num_sums = [num_sums] * self._num_schedules
list_num_sums_tmp = [list(num_sums) for num_sums in zip(*list_num_sums)]
for schedule_index in range(len(tmp_experiment.schedules)):
# Get the index corresponding to True and replace it.
target_index = self._get_target_index(tmp_experiment, schedule_index)
tmp_experiment.mprocesses[target_index] = mprocess
stream = to_stream(seed_or_generator)
empi_dists_sequence_tmp = tmp_experiment.generate_empi_dists_sequence(
list_num_sums_tmp, seed_or_generator=stream
)
empi_dists_sequence = [
list(empi_dists) for empi_dists in zip(*empi_dists_sequence_tmp)
]
return empi_dists_sequence
def _testers(self) -> List[Union[State, Povm]]:
return self.experiment.states + self.experiment.povms
def _get_target_index(self, experiment: Experiment, schedule_index: int) -> int:
schedule = experiment.schedules[schedule_index]
# 0:state -> 1:mprocess -> 2:povm
MPROCESS_ITEM_INDEX = 1
target_index = schedule[MPROCESS_ITEM_INDEX][1]
return target_index
def _set_coeffs(self, experiment: Experiment, on_para_eq_constraint: bool) -> None:
# coeff0s and coeff1s
self._coeffs_0th = dict() # b
self._coeffs_1st = dict() # α
_, _, c_qpt_dict = calc_c_qpt(
states=self._experiment.states,
povms=self._experiment.povms,
schedules=self._experiment.schedules,
on_para_eq_constraint=on_para_eq_constraint,
)
self._c_qpt_dict = c_qpt_dict
dim = self._experiment.states[0].dim
schedule_n = len(self._experiment.schedules)
for schedule_index in range(schedule_n):
c_qpt = c_qpt_dict[schedule_index]
a_qmpt, b_qmpt = cqpt_to_cqmpt(
c_qpt,
m_mprocess=self.num_outcomes_estimate,
dim=dim,
on_para_eq_constraint=on_para_eq_constraint,
)
for element_index, a in enumerate(a_qmpt):
self._coeffs_1st[(schedule_index, element_index)] = a
self._coeffs_0th[(schedule_index, element_index)] = b_qmpt[
element_index
]
[docs] def generate_empi_dists(
self,
mprocess: MProcess,
num_sum: int,
seed_or_generator: Union[int, np.random.Generator] = None,
) -> List[Tuple[int, np.ndarray]]:
"""Generate empirical distributions using the data generated from probability distributions of all schedules.
see :func:`~quara.protocol.qtomography.qtomography.QTomography.generate_empi_dists`
"""
tmp_experiment = self._experiment.copy()
for schedule_index in range(len(tmp_experiment.schedules)):
target_index = self._get_target_index(tmp_experiment, schedule_index)
tmp_experiment.mprocesses[target_index] = mprocess
num_sums = [num_sum] * self._num_schedules
stream = to_stream(seed_or_generator)
empi_dist_seq = tmp_experiment.generate_empi_dists_sequence(
[num_sums], seed_or_generator=stream
)
empi_dists = list(itertools.chain.from_iterable(empi_dist_seq))
return empi_dists
[docs] def convert_var_to_qoperation(self, var: np.ndarray) -> MProcess:
template = self._template_qoperation
mprocess = template.generate_from_var(var=var)
return mprocess
[docs] def generate_empty_estimation_obj_with_setting_info(self) -> QOperation:
empty_estimation_obj = self._set_qoperations.mprocesses[0]
return empty_estimation_obj.copy()
[docs]def cqpt_to_cqmpt(
c_qpt: np.ndarray, m_mprocess: int, dim: int, on_para_eq_constraint: bool
) -> List[np.ndarray]:
c_list = [c_qpt] * m_mprocess
if on_para_eq_constraint:
if len(c_qpt.shape) < 2:
c_qpt = c_qpt.reshape((1, c_qpt.shape[0]))
d_qpt = c_qpt[:, : dim ** 2]
e_qpt = c_qpt[:, dim ** 2 :]
c_list = [c_qpt] * (m_mprocess - 1)
a_0_left = block_diag(*c_list)
a_0_right = np.zeros((a_0_left.shape[0], e_qpt.shape[1]))
a_0 = np.hstack([a_0_left, a_0_right])
d_dash_right_size = (d_qpt.shape[0], c_qpt.shape[1] - d_qpt.shape[1])
d_dash = np.hstack([-d_qpt, np.zeros(d_dash_right_size)])
a_1 = np.hstack([d_dash] * (m_mprocess - 1) + [e_qpt])
a_qmpt = np.vstack([a_0, a_1])
b_0 = np.zeros(d_qpt.shape[0] * (m_mprocess - 1))
b_1 = d_qpt.T[0]
b_qmpt = np.hstack([b_0, b_1])
else:
c_list = [c_qpt] * m_mprocess
c_qmpt = block_diag(*c_list)
a_qmpt = c_qmpt
b_qmpt = np.zeros(c_qmpt.shape[0])
return a_qmpt, b_qmpt