from typing import List, Optional, Union
import copy
from collections import Counter
import dataclasses
import pickle
import time
from pathlib import Path
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
from quara.objects.qoperation import QOperation
from quara.objects.state import State
from quara.objects.povm import Povm
from quara.objects.gate import Gate
from quara.minimization_algorithm.minimization_algorithm import (
MinimizationAlgorithm,
MinimizationAlgorithmOption,
)
from quara.loss_function.probability_based_loss_function import (
ProbabilityBasedLossFunction,
ProbabilityBasedLossFunctionOption,
)
from quara.protocol.qtomography.standard.loss_minimization_estimator import (
LossMinimizationEstimator,
)
from quara.protocol.qtomography.standard.standard_qst import StandardQst
from quara.protocol.qtomography.standard.standard_qpt import StandardQpt
from quara.protocol.qtomography.standard.standard_povmt import StandardPovmt
from quara.protocol.qtomography.standard.standard_qtomography_estimator import (
StandardQTomographyEstimator,
StandardQTomographyEstimationResult,
)
from quara.simulation.generation_setting import QOperationGenerationSettings
from quara.simulation.depolarized_qoperation_generation_setting import (
DepolarizedQOperationGenerationSetting,
)
from quara.simulation.random_effective_lindbladian_generation_setting import (
RandomEffectiveLindbladianGenerationSetting,
)
[docs]class StandardQTomographySimulationSetting:
def __init__(
self,
name: str,
true_object: "QOperation",
tester_objects: List["QOperation"],
estimator: "Estimator",
seed: int,
n_rep: int,
num_data: List[int],
schedules: Union[str, List[List[int]]],
eps_proj_physical: float,
loss=None,
loss_option=None,
algo=None,
algo_option=None,
) -> None:
self.name = name
self.true_object = copy.copy(true_object)
self.tester_objects = copy.copy(tester_objects)
self.estimator = copy.copy(estimator)
self.loss = copy.copy(loss)
self.loss_option = loss_option
self.algo = copy.copy(algo)
self.algo_option = algo_option
self.seed = seed
self.n_rep = n_rep
self.num_data = num_data
self.eps_proj_physical = eps_proj_physical
self.schedules = schedules
def __str__(self):
desc = f"Name: {self.name}"
desc += f"\nTrue Object: {self.true_object}"
counter = Counter([t.__class__.__name__ for t in self.tester_objects])
desc += "\nTester Objects" + ", ".join(
[f"{k} x {v}" for k, v in counter.items()]
)
desc += f"\nn_rep: {self.n_rep}"
desc += f"\nnum_data: {self.num_data}"
desc += f"\nEstimator: {self.estimator.__class__.__name__}"
desc += f"\neps_proj_physical: {self.eps_proj_physical}"
loss = None if self.loss is None else self.loss.__class__.__name__
desc += f"\nLoss: {loss}"
algo = None if self.algo is None else self.algo.__class__.__name__
desc += f"\nAlgo: {algo}"
return desc
[docs]@dataclasses.dataclass
class NoiseSetting:
qoperation_base: Union[QOperation, str]
method: str
para: bool
ids: List[int] = None
[docs] def to_generation_setting(
self, c_sys: "CompositeSystem"
) -> "QOperationGenerationSetting":
name2class_map = {
"depolarized": DepolarizedQOperationGenerationSetting,
"random_effective_lindbladian": RandomEffectiveLindbladianGenerationSetting,
}
if self.method in name2class_map:
target_class = name2class_map[self.method]
else:
message = f"noise_setting.method='{self.method}' is not implemented."
raise NotImplementedError(message)
return target_class(
qoperation_base=self.qoperation_base,
c_sys=c_sys,
**self.para,
ids=self.ids,
)
[docs]@dataclasses.dataclass
class EstimatorTestSetting:
true_object: NoiseSetting
tester_objects: List[NoiseSetting]
seed: int
n_rep: int
num_data: List[int]
n_sample: int
schedules: Union[str, List[List[int]]]
case_names: List[str]
estimators: List["Estimator"]
eps_proj_physical_list: List[float]
algo_list: List[tuple]
loss_list: List[tuple]
parametrizations: List[bool]
c_sys: "CompositeSystem"
[docs] def to_pickle(self, path: Union[str, Path]) -> None:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
pickle.dump(self, f)
[docs] def to_generation_settings(self) -> QOperationGenerationSettings:
true_setting = self.true_object.to_generation_setting(self.c_sys)
tester_settings = [
setting.to_generation_setting(self.c_sys) for setting in self.tester_objects
]
generation_settings = QOperationGenerationSettings(
true_setting=true_setting, tester_settings=tester_settings
)
return generation_settings
[docs] def to_simulation_setting(
self,
true_object: "QOperation",
tester_objects: List["QOperation"],
case_index: int,
) -> StandardQTomographySimulationSetting:
return StandardQTomographySimulationSetting(
name=self.case_names[case_index],
estimator=self.estimators[case_index],
loss=self.loss_list[case_index][0],
loss_option=self.loss_list[case_index][1],
algo=self.algo_list[case_index][0],
algo_option=self.algo_list[case_index][1],
true_object=true_object,
tester_objects=tester_objects,
n_rep=self.n_rep,
seed=self.seed,
num_data=self.num_data,
schedules=self.schedules,
eps_proj_physical=self.eps_proj_physical_list[case_index],
)
[docs]@dataclasses.dataclass
class SimulationResult:
result_index: dict
simulation_setting: StandardQTomographySimulationSetting
estimation_results: List["EstimationResult"]
check_result: dict
[docs] def to_pickle(self, path: Union[str, Path]) -> None:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
pickle.dump(self, f)
[docs] def to_dict(self) -> dict:
result_dict = dict(
test_setting_index=self.result_index["test_setting_index"],
sample_index=self.result_index["sample_index"],
case_index=self.result_index["case_index"],
name=self.simulation_setting.name,
total_result=self.check_result["total_result"],
)
def _make_warning_text(r):
possibly_ok = r["detail"]["possibly_ok"]
to_be_checked = r["detail"]["to_be_checked"]
warning_text = ""
if not possibly_ok:
warning_text = f"Consistency: possibly_ok={possibly_ok}, to_be_checked={to_be_checked}"
return warning_text
check_result = {}
warning_text = ""
for r in self.check_result["results"]:
check_result[r["name"]] = r["result"]
if r["name"] == "Consistency":
warning_text += _make_warning_text(r)
result_dict.update(check_result)
result_dict["warning"] = warning_text
return result_dict
# common
[docs]def execute_simulation(
qtomography: "StandardQTomography",
simulation_setting: StandardQTomographySimulationSetting,
) -> List[StandardQTomographyEstimationResult]:
estimation_results = generate_empi_dists_and_calc_estimate(
qtomography=qtomography,
true_object=simulation_setting.true_object,
num_data=simulation_setting.num_data,
estimator=simulation_setting.estimator,
loss=simulation_setting.loss,
loss_option=simulation_setting.loss_option,
algo=simulation_setting.algo,
algo_option=simulation_setting.algo_option,
iteration=simulation_setting.n_rep,
)
return estimation_results
# common
def _generate_empi_dists_and_calc_estimate(
qtomography: "StandardQTomography",
true_object: QOperation,
num_data: List[int],
estimator=StandardQTomographyEstimator,
loss: ProbabilityBasedLossFunction = None,
loss_option: ProbabilityBasedLossFunctionOption = None,
algo: MinimizationAlgorithm = None,
algo_option: MinimizationAlgorithmOption = None,
) -> StandardQTomographyEstimationResult:
empi_dists_seq = qtomography.generate_empi_dists_sequence(true_object, num_data)
if isinstance(estimator, LossMinimizationEstimator):
result = estimator.calc_estimate_sequence(
qtomography,
empi_dists_seq,
loss=loss,
loss_option=loss_option,
algo=algo,
algo_option=algo_option,
is_computation_time_required=True,
)
else:
result = estimator.calc_estimate_sequence(
qtomography,
empi_dists_seq,
is_computation_time_required=True,
)
return result
[docs]def re_estimate(
test_setting: EstimatorTestSetting, result: SimulationResult, n_rep_index: int
) -> StandardQTomographyEstimationResult:
case_index = result.result_index["case_index"]
empi_dists_seq = result.estimation_results[n_rep_index].data
sim_setting = result.simulation_setting
qtomography = generate_qtomography(
sim_setting,
para=test_setting.parametrizations[case_index],
)
estimator = copy.deepcopy(result.simulation_setting.estimator)
estimation_result = estimator.calc_estimate_sequence(
qtomography,
empi_dists_seq,
is_computation_time_required=True,
)
return estimation_result
[docs]def re_estimate_sequence(
test_setting: EstimatorTestSetting, result: SimulationResult
) -> List[StandardQTomographyEstimationResult]:
sim_setting = result.simulation_setting
estimation_results = []
for n_rep_index in range(sim_setting.n_rep):
estimation_result = re_estimate(test_setting, result, n_rep_index)
estimation_results.append(estimation_result)
return estimation_results
[docs]def re_estimate_sequence_from_path(
test_setting_path: Union[str, Path], result_path: Union[str, Path]
) -> List[StandardQTomographyEstimationResult]:
with open(result_path, "rb") as f:
result = pickle.load(f)
with open(test_setting_path, "rb") as f:
test_setting = pickle.load(f)
estimation_results = re_estimate_sequence(test_setting, result)
return estimation_results
[docs]def re_estimate_sequence_from_index(
root_dir: str, test_setting_index: int, sample_index: int, case_index: int
) -> List[StandardQTomographyEstimationResult]:
result_path = (
Path(root_dir)
/ str(test_setting_index)
/ str(sample_index)
/ f"case_{case_index}_result.pickle"
)
test_setting_path = Path(root_dir) / str(test_setting_index) / "test_setting.pickle"
estimation_results = re_estimate_sequence_from_path(test_setting_path, result_path)
return estimation_results
# common
[docs]def generate_empi_dists_and_calc_estimate(
qtomography: "StandardQTomography",
true_object: QOperation,
num_data: List[int],
estimator: StandardQTomographyEstimator,
loss: ProbabilityBasedLossFunction = None,
loss_option: ProbabilityBasedLossFunctionOption = None,
algo: MinimizationAlgorithm = None,
algo_option: MinimizationAlgorithmOption = None,
iteration: Optional[int] = None,
) -> Union[
StandardQTomographyEstimationResult,
List[StandardQTomographyEstimationResult],
]:
if iteration is None:
result = _generate_empi_dists_and_calc_estimate(
qtomography,
true_object,
num_data,
estimator,
loss=loss,
loss_option=loss_option,
algo=algo,
algo_option=algo_option,
)
return result
else:
results = []
for _ in tqdm(range(iteration)):
result = _generate_empi_dists_and_calc_estimate(
qtomography,
true_object,
num_data,
estimator,
loss=loss,
loss_option=loss_option,
algo=algo,
algo_option=algo_option,
)
results.append(result)
return results
# Data Convert
[docs]def generate_qtomography(
sim_setting: StandardQTomographySimulationSetting, para: bool
) -> "StandardQTomography":
true_object = sim_setting.true_object
tester_objects = sim_setting.tester_objects
eps_proj_physical = sim_setting.eps_proj_physical
seed = sim_setting.seed
if type(true_object) == State:
return StandardQst(
tester_objects,
on_para_eq_constraint=para,
seed=seed,
eps_proj_physical=eps_proj_physical,
)
if type(true_object) == Povm:
return StandardPovmt(
tester_objects,
on_para_eq_constraint=para,
seed=seed,
eps_proj_physical=eps_proj_physical,
num_outcomes=len(true_object.vecs),
)
if type(true_object) == Gate:
states = [t for t in tester_objects if type(t) == State]
povms = [t for t in tester_objects if type(t) == Povm]
return StandardQpt(
states=states,
povms=povms,
on_para_eq_constraint=para,
seed=seed,
eps_proj_physical=eps_proj_physical,
)
message = f"type of sim_setting.true_object must be State, Povm, or Gate, not {type(true_object)}"
print(f"{sim_setting.true_object}")
raise TypeError(message)