Source code for quara.simulation.standard_qtomography_simulation_flow

from typing import List, Union, Dict
import copy
import time
from pathlib import Path
import json
import pickle
import itertools

import numpy as np
from numpy.random import Generator, MT19937, SeedSequence
import pandas as pd
import joblib

from quara.simulation import standard_qtomography_simulation_report as report

from quara.simulation.standard_qtomography_simulation_check import (
    StandardQTomographySimulationCheck,
)
from quara.simulation import standard_qtomography_simulation as sim
from quara.simulation.standard_qtomography_simulation import (
    EstimatorTestSetting,
    SimulationResult,
)
from quara.objects.qoperation_typical import generate_qoperation_object
from quara.protocol.qtomography.standard.loss_minimization_estimator import (
    LossMinimizationEstimator,
)


[docs]def execute_simulation_case_unit( test_setting, true_object, tester_objects, empi_dists_sequences, # case_index: int, sample_index: int, test_setting_index: int, root_dir: str, exec_sim_check: dict = None, n_jobs: int = 1, data_saving: str = "on_memory", is_computation_time_required: bool = True, is_detailed_results_required: bool = False, ) -> SimulationResult: # Generate QTomographySimulationSetting sim_setting = test_setting.to_simulation_setting( true_object, tester_objects, case_index ) print(f"Case {case_index}: {sim_setting.name}") org_sim_setting = sim_setting.copy() # # Generate QTomography # # Do not set the random number seed when initializing qtomography. # # Use the random number stream later when generating the empirical distribution. qtomography = sim.generate_qtomography( sim_setting, para=test_setting.parametrizations[case_index], init_with_seed=False, ) # TODO: remove # sim_result = [] # Execute if data_saving == "on_memory": sim_result = sim.execute_estimation( qtomography=qtomography, simulation_setting=sim_setting, empi_dists_sequences=empi_dists_sequences, n_jobs=n_jobs, is_computation_time_required=is_computation_time_required, is_detailed_results_required=is_detailed_results_required, ) else: sim_result = sim.execute_estimation_with_saved_empi_dists_sequences( qtomography=qtomography, simulation_setting=sim_setting, dir_path_empi_dists_sequences=Path(root_dir) / str(sample_index) / "empi_dists_sequences", n_jobs=n_jobs, is_computation_time_required=is_computation_time_required, is_detailed_results_required=is_detailed_results_required, ) # Simulation Check sim_check = StandardQTomographySimulationCheck(sim_result) check_result = sim_check.execute_all( show_detail=False, with_detail=True, exec_check=exec_sim_check ) # Show result if not check_result["total_result"]: start_red = "\033[31m" end_color = "\033[0m" print(f"Total Result: {start_red}NG{end_color}") result_index = dict( test_setting_index=test_setting_index, sample_index=sample_index, case_index=case_index, ) # Add to SimulationResult sim_result.simulation_setting = org_sim_setting sim_result.result_index = result_index sim_result.check_result = check_result # Save write_result_case_unit(sim_result, root_dir=root_dir) return sim_result
[docs]def execute_simulation_sample_unit( test_setting, generation_settings, test_setting_index, sample_index, root_dir, pdf_mode: str = "only_ng", stream_qoperation: Union[int, np.random.Generator] = None, exec_sim_check: Dict[str, bool] = None, parallel_mode: Dict[str, int] = None, data_saving: str = "on_memory", is_computation_time_required: bool = True, is_detailed_results_required: bool = False, ) -> List[SimulationResult]: # Generate sample _f = generation_settings.true_setting.generate if "seed_or_generator" in _f.__code__.co_varnames[: _f.__code__.co_argcount]: true_object = generation_settings.true_setting.generate( seed_or_generator=stream_qoperation ) tester_objects = [ tester_setting.generate(stream_qoperation) for tester_setting in generation_settings.tester_settings ] else: # True Object if test_setting.true_object.method is None: name = test_setting.true_object.qoperation_base[1] mode_name = test_setting.true_object.qoperation_base[0] true_object = generate_qoperation_object( mode=mode_name, object_name=mode_name, name=name, c_sys=test_setting.c_sys, ) else: true_object = generation_settings.true_setting.generate() # Tester Objects tester_objects = [] for i, tester_noise_setting in enumerate(test_setting.tester_objects): if tester_noise_setting.method is None: name = tester_noise_setting.qoperation_base[1] mode_name = tester_noise_setting.qoperation_base[0] tester_objects.append( generate_qoperation_object( mode=mode_name, object_name=mode_name, name=name, c_sys=test_setting.c_sys, ) ) else: generation_setting = generation_settings.tester_settings[i] tester_objects.append(generation_setting.generate()) true_object = true_object[0] if type(true_object) == tuple else true_object tester_objects = [ tester[0] if type(tester) == tuple else tester for tester in tester_objects ] results = [] case_n = len(test_setting.case_names) # Generate QTomography # Do not set the random number seed when initializing qtomography. # Use the random number stream later when generating the empirical distribution. dummy_case_index = 0 tmp_sim_setting = test_setting.to_simulation_setting( true_object, tester_objects, dummy_case_index ) tmp_qtomography = sim.generate_qtomography( tmp_sim_setting, para=test_setting.parametrizations[dummy_case_index], # dummy init_with_seed=False, ) # Generate a random number stream to generate the empirical distribution. if type(parallel_mode) == dict and "per_data_generation" in parallel_mode: per_data_generation_n_jobs = parallel_mode["per_data_generation"] else: per_data_generation_n_jobs = 1 sg = SeedSequence(tmp_sim_setting.seed_data) # The default for RandomState is MT19937, so use this. # Change it if necessary(PCG64, PCG64DXSM, etc.). stream_datas = [Generator(MT19937(s)) for s in sg.spawn(tmp_sim_setting.n_rep)] empi_dists_sequences = joblib.Parallel( n_jobs=per_data_generation_n_jobs, verbose=2 )( [ joblib.delayed(tmp_qtomography.generate_empi_dists_sequence)( true_object, tmp_sim_setting.num_data, s ) for s in stream_datas ] ) if data_saving == "on_storage": raise NotImplementedError() if type(parallel_mode) == dict and "per_estimator_unit" in parallel_mode: per_estimator_unit_n_jobs = parallel_mode["per_estimator_unit"] else: per_estimator_unit_n_jobs = 1 if type(parallel_mode) == dict and "per_estimator_execution" in parallel_mode: per_estimator_execution_n_jobs = parallel_mode["per_estimator_execution"] else: per_estimator_execution_n_jobs = 1 results = joblib.Parallel(n_jobs=per_estimator_unit_n_jobs, verbose=2)( [ joblib.delayed(execute_simulation_case_unit)( test_setting, true_object, tester_objects, empi_dists_sequences, case_index, sample_index, test_setting_index, root_dir, exec_sim_check, per_estimator_execution_n_jobs, is_computation_time_required=is_computation_time_required, is_detailed_results_required=is_detailed_results_required, ) for case_index in range(case_n) ] ) # Save write_result_sample_unit(results, root_dir=root_dir) # Save PDF if pdf_mode == "all": write_pdf_report(results, root_dir, display_items=exec_sim_check) elif pdf_mode == "only_ng": total_results = [r.check_result["total_result"] for r in results] print(f"total_result={np.all(total_results)}") if not np.all(total_results): write_pdf_report(results, root_dir, display_items=exec_sim_check) elif pdf_mode == "none": pass else: message = "`pdf_mode` must be 'all', 'only_ng', or 'none'." raise ValueError(message) return results
[docs]def execute_simulation_test_setting_unit( test_setting, test_setting_index, root_dir, exec_sim_check: Dict[str, bool] = None, pdf_mode: str = "only_ng", parallel_mode: Dict[str, int] = None, data_saving: str = "on_memory", is_computation_time_required: bool = True, is_detailed_results_required: bool = False, ) -> List[SimulationResult]: generation_settings = test_setting.to_generation_settings() n_sample = test_setting.n_sample results = [] sg = SeedSequence(test_setting.seed_qoperation) # The default for RandomState is MT19937, so use this. # Change it if necessary(PCG64, PCG64DXSM, etc.). gens_qperations = [Generator(MT19937(s)) for s in sg.spawn(n_sample)] if type(parallel_mode) == dict and "per_sample_unit" in parallel_mode: n_jobs = parallel_mode["per_sample_unit"] else: n_jobs = 1 results = joblib.Parallel(n_jobs=n_jobs, verbose=2)( [ joblib.delayed(execute_simulation_sample_unit)( test_setting, generation_settings, test_setting_index, sample_index, root_dir, pdf_mode, random_gen, exec_sim_check, parallel_mode, data_saving, is_computation_time_required=is_computation_time_required, is_detailed_results_required=is_detailed_results_required, ) for sample_index, random_gen in enumerate(gens_qperations) ] ) results = list(itertools.chain.from_iterable(results)) # Save write_result_test_setting_unit(results, root_dir, test_setting_index) return results
[docs]def execute_simulation_test_settings( test_settings: List[EstimatorTestSetting], root_dir: str, pdf_mode: str = "only_ng", exec_sim_check: Dict[str, bool] = None, parallel_mode: Dict[str, int] = None, data_saving: str = "on_memory", is_computation_time_required: bool = True, is_detailed_results_required: bool = False, ) -> List[SimulationResult]: """ Run a simulation by specifying multiple EstimationTestSettings. Parameters ---------- test_settings : List[EstimatorTestSetting] List of EstimationTestSetting root_dir : str Root folder where the results will be saved. pdf_mode : str, optional Settings for PDF reporting of simulation results, by default "only_ng". "all": output all. "only_ng": output only if the result of the simulation check is NG. "none": do not output exec_sim_check : Dict[str, bool], optional Items to check for simulation results, by default None. The key of the dictionary is the name of the check item ("consistency", "mse_of_estimators", "mse_of_empi_dists", "physicality_violation"), and the value is whether or not to check (True/False). This check uses `StandardQTomographySimulationCheck`. parallel_mode : Dict[str, int], optional Parallelization settings, by default None. For this parallelization, joblib is used. The key of the dictionary is the type of process to parallelize ("per_sample_unit", "per_data_generation", "per_estimator_unit", "per_estimator_execution"), and the value is the maximum number of concurrently running jobs. This is the same parameter as n_jobs in joblib. Returns ------- List[SimulationResult] List of simulation results """ all_results = [] start = time.time() for test_setting_index, test_setting in enumerate(test_settings): path = Path(root_dir) / str(test_setting_index) / "test_setting.pickle" test_setting.to_pickle(path) print(f"Completed to write test_setting. {path}") test_results = execute_simulation_test_setting_unit( test_setting, test_setting_index, root_dir, exec_sim_check=exec_sim_check, pdf_mode=pdf_mode, parallel_mode=parallel_mode, data_saving=data_saving, is_computation_time_required=is_computation_time_required, is_detailed_results_required=is_detailed_results_required, ) all_results += test_results # Save write_results(all_results, root_dir) elapsed_time = time.time() - start _print_summary(all_results, elapsed_time) return all_results
def _print_summary(results: List[SimulationResult], elapsed_time: float) -> None: def _to_dict(result: SimulationResult) -> dict: check_result = {} for r in result.check_result["results"]: if r["name"] == "Consistency": check_result["Consistency_possibly_ok"] = r["detail"]["possibly_ok"] check_result["Consistency_to_be_checked"] = r["detail"]["to_be_checked"] else: check_result[r["name"]] = r["result"] return check_result result_dict_list = [_to_dict(result) for result in results] df = pd.DataFrame(result_dict_list) start_red = "\033[31m" start_green = "\033[32m" start_yellow = "\033[33m" end_color = "\033[0m" result_lines = [] for col in df.columns: if col == "Consistency_to_be_checked": continue ok_n = df[df[col]].shape[0] ng_n = df[~df[col]].shape[0] if col == "Consistency_possibly_ok": result_line = f"Consistency:\n" result_line += f"{start_green}OK: {ok_n} cases{end_color}, {start_red}NG: {ng_n} cases{end_color}\n" to_be_checked_n = df[df["Consistency_to_be_checked"]].shape[0] result_line += f"You need to check report: {to_be_checked_n} cases\n" else: result_line = f"{col}:\n" result_line += f"{start_green}OK: {ok_n} cases{end_color}, {start_red}NG: {ng_n} cases{end_color}\n" result_lines.append(result_line) def _to_h_m_s(sec) -> tuple: m, s = divmod(int(sec), 60) h, m = divmod(m, 60) return h, m, s h, m, s = _to_h_m_s(elapsed_time) time_text = "{:.1f}s ".format(elapsed_time) time_text += f"({h}:{str(m).zfill(2)}:{str(s).zfill(2)})" summary_text = ( f"{start_yellow}=============== Summary ================={end_color}\n" ) summary_text += "\n".join(result_lines) summary_text += f"\nTime:\n{time_text}\n" summary_text += ( f"{start_yellow}========================================={end_color}" ) print(summary_text) # re-estimate
[docs]def re_estimate_case_unit( input_root_dir: str, case_index: int, sample_index: int, test_setting_index: int, output_root_dir: str, test_setting=None, exec_sim_check: dict = None, ) -> SimulationResult: if test_setting is None: test_setting_pickle_path = ( f"{input_root_dir}/{test_setting_index}/test_setting.pickle" ) with open(test_setting_pickle_path, "rb") as f: test_setting = pickle.load(f) # Load pickle simulation_result_path = ( Path(input_root_dir) / str(test_setting_index) / str(sample_index) / f"case_{case_index}_result.pickle" ) with open(simulation_result_path, "rb") as f: source_sim_result = pickle.load(f) sim_setting = source_sim_result.simulation_setting empi_dists_seqences = source_sim_result.empi_dists_sequences print(f"Case {case_index}: {sim_setting.name}") org_sim_setting = sim_setting.copy() # Generate QTomography qtomography = sim.generate_qtomography( sim_setting, para=test_setting.parametrizations[case_index], ) # Re-estimate estimation_results = [] for n_rep_index in range(sim_setting.n_rep): empi_dists_seq = source_sim_result.empi_dists_sequences[n_rep_index] estimator = copy.deepcopy(source_sim_result.simulation_setting.estimator) if isinstance(estimator, LossMinimizationEstimator): estimation_result = estimator.calc_estimate_sequence( qtomography, empi_dists_seq, loss=sim_setting.loss, loss_option=sim_setting.loss_option, algo=sim_setting.algo, algo_option=sim_setting.algo_option, is_computation_time_required=True, ) else: estimation_result = estimator.calc_estimate_sequence( qtomography, empi_dists_seq, is_computation_time_required=True, ) estimation_results.append(estimation_result) result_index = dict( test_setting_index=test_setting_index, sample_index=sample_index, case_index=case_index, ) re_estimated_sim_result = SimulationResult( estimation_results=estimation_results, empi_dists_sequences=empi_dists_seqences, qtomography=qtomography, simulation_setting=sim_setting, result_index=result_index, ) # Simulation Check sim_check = StandardQTomographySimulationCheck(re_estimated_sim_result) check_result = sim_check.execute_all( show_detail=False, with_detail=True, exec_check=exec_sim_check ) # Show result if not check_result["total_result"]: start_red = "\033[31m" end_color = "\033[0m" print(f"Total Result: {start_red}NG{end_color}") re_estimated_sim_result.simulation_setting = org_sim_setting re_estimated_sim_result.check_result = check_result # Save write_result_case_unit(re_estimated_sim_result, root_dir=output_root_dir) return re_estimated_sim_result
[docs]def re_estimate_sample_unit( test_setting_index, sample_index, output_root_dir, input_root_dir, exec_sim_check: dict = None, pdf_mode: str = "only_ng", ) -> List[SimulationResult]: # Load test setting pickle test_setting_pickle_path = ( f"{input_root_dir}/{test_setting_index}/test_setting.pickle" ) with open(test_setting_pickle_path, "rb") as f: test_setting = pickle.load(f) case_n = len(test_setting.case_names) results = [] for case_index in range(case_n): result = re_estimate_case_unit( test_setting=test_setting, case_index=case_index, sample_index=sample_index, test_setting_index=test_setting_index, input_root_dir=input_root_dir, output_root_dir=output_root_dir, exec_sim_check=exec_sim_check, ) results.append(result) # Save write_result_sample_unit(results, root_dir=output_root_dir) # Save PDF if pdf_mode == "all": write_pdf_report(results, output_root_dir, display_items=exec_sim_check) elif pdf_mode == "only_ng": total_results = [r.check_result["total_result"] for r in results] print(f"total_result={np.all(total_results)}") if not np.all(total_results): write_pdf_report(results, output_root_dir, display_items=exec_sim_check) elif pdf_mode == "none": pass else: message = "`pdf_mode` must be 'all', 'only_ng', or 'none'." raise ValueError(message) return results
[docs]def re_estimate_test_setting_unit( test_setting_index, output_root_dir, input_root_dir, exec_sim_check: dict = None, pdf_mode: str = "only_ng", ) -> List[SimulationResult]: # Load test setting from pickle test_setting_pickle_path = ( f"{input_root_dir}/{test_setting_index}/test_setting.pickle" ) with open(test_setting_pickle_path, "rb") as f: test_setting = pickle.load(f) n_sample = test_setting.n_sample results = [] for sample_index in range(n_sample): sample_results = re_estimate_sample_unit( test_setting_index=test_setting_index, sample_index=sample_index, input_root_dir=input_root_dir, output_root_dir=output_root_dir, exec_sim_check=exec_sim_check, pdf_mode=pdf_mode, ) results += sample_results # Save write_result_test_setting_unit(results, output_root_dir, test_setting_index) return results
[docs]def re_estimate_test_settings( input_root_dir: str, output_root_dir: str, pdf_mode: str, exec_sim_check: dict = None, ) -> List[SimulationResult]: # Load All Test Setting test_setting_pickle_paths = sorted( Path(input_root_dir).glob("*/test_setting.pickle") ) all_results = [] for path in test_setting_pickle_paths: test_setting_index = path.parent.name # directory name is test_setting_index test_results = re_estimate_test_setting_unit( test_setting_index, input_root_dir=input_root_dir, output_root_dir=output_root_dir, exec_sim_check=exec_sim_check, pdf_mode=pdf_mode, ) all_results += test_results return all_results
# writer
[docs]def write_results(results: List[SimulationResult], dir_path: str) -> None: dir_path = Path(dir_path) dir_path.mkdir(parents=True, exist_ok=True) path = dir_path / "check_result.csv" result_dict_list = [result.to_dict() for result in results] sample_result_df = pd.DataFrame(result_dict_list) sample_result_df.to_csv(path, index=None) print(f"Completed to write csv. {path}")
[docs]def write_result_sample_unit(results: List[SimulationResult], root_dir: str) -> None: test_setting_index = results[0].result_index["test_setting_index"] sample_index = results[0].result_index["sample_index"] dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index) write_results(results, dir_path)
[docs]def write_result_test_setting_unit( results: List[SimulationResult], root_dir: str, test_setting_index: int ) -> None: dir_path = Path(root_dir) / str(test_setting_index) write_results(results, dir_path)
[docs]def write_pdf_report( results: List[SimulationResult], root_dir: str, display_items: dict = None ) -> None: test_setting_index = results[0].result_index["test_setting_index"] sample_index = results[0].result_index["sample_index"] dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index) dir_path.mkdir(parents=True, exist_ok=True) path = dir_path / f"{test_setting_index}_{sample_index}_quara_report.pdf" report.export_report_from_index( input_root_dir=root_dir, test_setting_index=test_setting_index, sample_index=sample_index, output_path=path, display_items=display_items, )
[docs]def write_result_case_unit(sim_result: SimulationResult, root_dir: str) -> None: test_setting_index = sim_result.result_index["test_setting_index"] sample_index = sim_result.result_index["sample_index"] case_index = sim_result.result_index["case_index"] # Save pickle dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index) path = dir_path / f"case_{case_index}_result.pickle" sim_result.to_pickle(path) # Save JSON # EstimationResult cannot be converted to JSON. # Therefore, alternative text is used. alternative_results = [] for r in sim_result.check_result["results"]: if r["name"] == "Consistency": alternative_text = "EstimationResult generated in the process of ConsistencyCheck is not dumped to json, check the pickle." new_r = copy.deepcopy(r) new_r["detail"]["estimation_result"] = alternative_text alternative_results.append(new_r) else: alternative_results.append(r) alternative_check_result = copy.deepcopy(sim_result.check_result) alternative_check_result["results"] = alternative_results path = dir_path / f"case_{case_index}_check_result.json" with open(path, "w") as f: json.dump( alternative_check_result, f, ensure_ascii=False, indent=4, separators=(",", ": "), )