Source code for quara.simulation.standard_qtomography_simulation_flow

from itertools import starmap
from typing import List, Union
import copy
import time
from pathlib import Path
import json
import pickle

import numpy as np
import pandas as pd
from tqdm import tqdm

from quara.simulation import standard_qtomography_simulation_report as report

from quara.simulation.standard_qtomography_simulation_check import (
    StandardQTomographySimulationCheck,
)
from quara.simulation import standard_qtomography_simulation as sim
from quara.simulation.standard_qtomography_simulation import (
    EstimatorTestSetting,
    SimulationResult,
    StandardQTomographySimulationSetting,
)
from quara.protocol.qtomography.standard.loss_minimization_estimator import (
    LossMinimizationEstimator,
)


[docs]def execute_simulation_case_unit( test_setting, true_object, tester_objects, case_index: int, sample_index: int, test_setting_index: int, root_dir: str, exec_sim_check: dict = None, ) -> SimulationResult: # Generate QTomographySimulationSetting sim_setting = test_setting.to_simulation_setting( true_object, tester_objects, case_index ) print(f"Case {case_index}: {sim_setting.name}") org_sim_setting = sim_setting.copy() # Generate QTomography # Do not set the random number seed when initializing qtomography. # Use the random number stream later when generating the empirical distribution. qtomography = sim.generate_qtomography( sim_setting, para=test_setting.parametrizations[case_index], init_with_seed=False, ) # Generate a random number stream to generate the empirical distribution. stream_data = np.random.RandomState(sim_setting.seed_data) # Execute sim_result = sim.execute_simulation( qtomography=qtomography, simulation_setting=sim_setting, seed_or_stream=stream_data, ) # Simulation Check sim_check = StandardQTomographySimulationCheck(sim_result) check_result = sim_check.execute_all( show_detail=False, with_detail=True, exec_check=exec_sim_check ) # Show result if not check_result["total_result"]: start_red = "\033[31m" end_color = "\033[0m" print(f"Total Result: {start_red}NG{end_color}") result_index = dict( test_setting_index=test_setting_index, sample_index=sample_index, case_index=case_index, ) # Add to SimulationResult sim_result.simulation_setting = org_sim_setting sim_result.result_index = result_index sim_result.check_result = check_result # Save write_result_case_unit(sim_result, root_dir=root_dir) return sim_result
[docs]def execute_simulation_sample_unit( test_setting, generation_settings, test_setting_index, sample_index, root_dir, pdf_mode: str = "only_ng", stream_qoperation: Union[int, np.random.RandomState] = None, exec_sim_check: dict = None, ) -> List[SimulationResult]: # Generate sample _f = generation_settings.true_setting.generate if "seed_or_stream" in _f.__code__.co_varnames[: _f.__code__.co_argcount]: true_object = generation_settings.true_setting.generate( seed_or_stream=stream_qoperation ) tester_objects = [ tester_setting.generate(stream_qoperation) for tester_setting in generation_settings.tester_settings ] else: true_object = generation_settings.true_setting.generate() tester_objects = [ tester_setting.generate() for tester_setting in generation_settings.tester_settings ] true_object = true_object[0] if type(true_object) == tuple else true_object tester_objects = [ tester[0] if type(tester) == tuple else tester for tester in tester_objects ] results = [] case_n = len(test_setting.case_names) for case_index in range(case_n): result = execute_simulation_case_unit( test_setting, true_object=true_object, tester_objects=tester_objects, case_index=case_index, sample_index=sample_index, test_setting_index=test_setting_index, root_dir=root_dir, exec_sim_check=exec_sim_check, ) results.append(result) # Save write_result_sample_unit(results, root_dir=root_dir) # Save PDF if pdf_mode == "all": write_pdf_report(results, root_dir, display_items=exec_sim_check) elif pdf_mode == "only_ng": total_results = [r.check_result["total_result"] for r in results] print(f"total_result={np.all(total_results)}") if not np.all(total_results): write_pdf_report(results, root_dir, display_items=exec_sim_check) elif pdf_mode == "none": pass else: message = "`pdf_mode` must be 'all', 'only_ng', or 'none'." raise ValueError(message) return results
[docs]def execute_simulation_test_setting_unit( test_setting, test_setting_index, root_dir, exec_sim_check: dict = None, pdf_mode: str = "only_ng", ) -> List[SimulationResult]: generation_settings = test_setting.to_generation_settings() n_sample = test_setting.n_sample results = [] stream_qoperation = np.random.RandomState(test_setting.seed_qoperation) for sample_index in range(n_sample): sample_results = execute_simulation_sample_unit( test_setting, generation_settings, test_setting_index, sample_index, root_dir, pdf_mode=pdf_mode, stream_qoperation=stream_qoperation, exec_sim_check=exec_sim_check, ) results += sample_results # Save write_result_test_setting_unit(results, root_dir) return results
[docs]def execute_simulation_test_settings( test_settings: List[EstimatorTestSetting], root_dir: str, pdf_mode: str = "only_ng", exec_sim_check: dict = None, ) -> List[SimulationResult]: all_results = [] start = time.time() for test_setting_index, test_setting in enumerate(test_settings): path = Path(root_dir) / str(test_setting_index) / "test_setting.pickle" test_setting.to_pickle(path) print(f"Completed to write test_setting. {path}") test_results = execute_simulation_test_setting_unit( test_setting, test_setting_index, root_dir, exec_sim_check=exec_sim_check, pdf_mode=pdf_mode, ) all_results += test_results # Save write_results(all_results, root_dir) elapsed_time = time.time() - start _print_summary(all_results, elapsed_time) return all_results
def _print_summary(results: List[SimulationResult], elapsed_time: float) -> None: def _to_dict(result: SimulationResult) -> dict: check_result = {} for r in result.check_result["results"]: if r["name"] == "Consistency": check_result["Consistency_possibly_ok"] = r["detail"]["possibly_ok"] check_result["Consistency_to_be_checked"] = r["detail"]["to_be_checked"] else: check_result[r["name"]] = r["result"] return check_result result_dict_list = [_to_dict(result) for result in results] df = pd.DataFrame(result_dict_list) start_red = "\033[31m" start_green = "\033[32m" start_yellow = "\033[33m" end_color = "\033[0m" result_lines = [] for col in df.columns: if col == "Consistency_to_be_checked": continue ok_n = df[df[col]].shape[0] ng_n = df[~df[col]].shape[0] if col == "Consistency_possibly_ok": result_line = f"Consistency:\n" result_line += f"{start_green}OK: {ok_n} cases{end_color}, {start_red}NG: {ng_n} cases{end_color}\n" to_be_checked_n = df[df["Consistency_to_be_checked"]].shape[0] result_line += f"You need to check report: {to_be_checked_n} cases\n" else: result_line = f"{col}:\n" result_line += f"{start_green}OK: {ok_n} cases{end_color}, {start_red}NG: {ng_n} cases{end_color}\n" result_lines.append(result_line) def _to_h_m_s(sec) -> tuple: m, s = divmod(int(sec), 60) h, m = divmod(m, 60) return h, m, s h, m, s = _to_h_m_s(elapsed_time) time_text = "{:.1f}s ".format(elapsed_time) time_text += f"({h}:{str(m).zfill(2)}:{str(s).zfill(2)})" summary_text = ( f"{start_yellow}=============== Summary ================={end_color}\n" ) summary_text += "\n".join(result_lines) summary_text += f"\nTime:\n{time_text}\n" summary_text += ( f"{start_yellow}========================================={end_color}" ) print(summary_text) # re-estimate
[docs]def re_estimate_case_unit( input_root_dir: str, case_index: int, sample_index: int, test_setting_index: int, output_root_dir: str, test_setting=None, exec_sim_check: dict = None, ) -> SimulationResult: if test_setting is None: test_setting_pickle_path = ( f"{input_root_dir}/{test_setting_index}/test_setting.pickle" ) with open(test_setting_pickle_path, "rb") as f: test_setting = pickle.load(f) # Load pickle simulation_result_path = ( Path(input_root_dir) / str(test_setting_index) / str(sample_index) / f"case_{case_index}_result.pickle" ) with open(simulation_result_path, "rb") as f: source_sim_result = pickle.load(f) sim_setting = source_sim_result.simulation_setting empi_dists_seqences = source_sim_result.empi_dists_sequences print(f"Case {case_index}: {sim_setting.name}") org_sim_setting = sim_setting.copy() # Generate QTomography qtomography = sim.generate_qtomography( sim_setting, para=test_setting.parametrizations[case_index], ) # Re-estimate estimation_results = [] for n_rep_index in range(sim_setting.n_rep): empi_dists_seq = source_sim_result.empi_dists_sequences[n_rep_index] estimator = copy.deepcopy(source_sim_result.simulation_setting.estimator) if isinstance(estimator, LossMinimizationEstimator): estimation_result = estimator.calc_estimate_sequence( qtomography, empi_dists_seq, loss=sim_setting.loss, loss_option=sim_setting.loss_option, algo=sim_setting.algo, algo_option=sim_setting.algo_option, is_computation_time_required=True, ) else: estimation_result = estimator.calc_estimate_sequence( qtomography, empi_dists_seq, is_computation_time_required=True, ) estimation_results.append(estimation_result) result_index = dict( test_setting_index=test_setting_index, sample_index=sample_index, case_index=case_index, ) re_estimated_sim_result = SimulationResult( estimation_results=estimation_results, empi_dists_sequences=empi_dists_seqences, qtomography=qtomography, simulation_setting=sim_setting, result_index=result_index, ) # Simulation Check sim_check = StandardQTomographySimulationCheck(re_estimated_sim_result) check_result = sim_check.execute_all( show_detail=False, with_detail=True, exec_check=exec_sim_check ) # Show result if not check_result["total_result"]: start_red = "\033[31m" end_color = "\033[0m" print(f"Total Result: {start_red}NG{end_color}") re_estimated_sim_result.simulation_setting = org_sim_setting re_estimated_sim_result.check_result = check_result # Save write_result_case_unit(re_estimated_sim_result, root_dir=output_root_dir) return re_estimated_sim_result
[docs]def re_estimate_sample_unit( test_setting_index, sample_index, output_root_dir, input_root_dir, exec_sim_check: dict = None, pdf_mode: str = "only_ng", ) -> List[SimulationResult]: # Load test setting pickle test_setting_pickle_path = ( f"{input_root_dir}/{test_setting_index}/test_setting.pickle" ) with open(test_setting_pickle_path, "rb") as f: test_setting = pickle.load(f) case_n = len(test_setting.case_names) results = [] for case_index in range(case_n): result = re_estimate_case_unit( test_setting=test_setting, case_index=case_index, sample_index=sample_index, test_setting_index=test_setting_index, input_root_dir=input_root_dir, output_root_dir=output_root_dir, exec_sim_check=exec_sim_check, ) results.append(result) # Save write_result_sample_unit(results, root_dir=output_root_dir) # Save PDF if pdf_mode == "all": write_pdf_report(results, output_root_dir, display_items=exec_sim_check) elif pdf_mode == "only_ng": total_results = [r.check_result["total_result"] for r in results] print(f"total_result={np.all(total_results)}") if not np.all(total_results): write_pdf_report(results, output_root_dir, display_items=exec_sim_check) elif pdf_mode == "none": pass else: message = "`pdf_mode` must be 'all', 'only_ng', or 'none'." raise ValueError(message) return results
[docs]def re_estimate_test_setting_unit( test_setting_index, output_root_dir, input_root_dir, exec_sim_check: dict = None, pdf_mode: str = "only_ng", ) -> List[SimulationResult]: # Load test setting from pickle test_setting_pickle_path = ( f"{input_root_dir}/{test_setting_index}/test_setting.pickle" ) with open(test_setting_pickle_path, "rb") as f: test_setting = pickle.load(f) n_sample = test_setting.n_sample results = [] for sample_index in range(n_sample): sample_results = re_estimate_sample_unit( test_setting_index=test_setting_index, sample_index=sample_index, input_root_dir=input_root_dir, output_root_dir=output_root_dir, exec_sim_check=exec_sim_check, pdf_mode=pdf_mode, ) results += sample_results # Save write_result_test_setting_unit(results, output_root_dir) return results
[docs]def re_estimate_test_settings( input_root_dir: str, output_root_dir: str, pdf_mode: str, exec_sim_check: dict = None, ) -> List[SimulationResult]: # Load All Test Setting test_setting_pickle_paths = sorted( Path(input_root_dir).glob("*/test_setting.pickle") ) all_results = [] for path in test_setting_pickle_paths: test_setting_index = path.parent.name # directory name is test_setting_index test_results = re_estimate_test_setting_unit( test_setting_index, input_root_dir=input_root_dir, output_root_dir=output_root_dir, exec_sim_check=exec_sim_check, pdf_mode=pdf_mode, ) all_results += test_results return all_results
# writer
[docs]def write_results(results: List[SimulationResult], dir_path: str) -> None: dir_path = Path(dir_path) dir_path.mkdir(parents=True, exist_ok=True) path = dir_path / "check_result.csv" result_dict_list = [result.to_dict() for result in results] sample_result_df = pd.DataFrame(result_dict_list) sample_result_df.to_csv(path, index=None) print(f"Completed to write csv. {path}")
[docs]def write_result_sample_unit(results: List[SimulationResult], root_dir: str) -> None: test_setting_index = results[0].result_index["test_setting_index"] sample_index = results[0].result_index["sample_index"] dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index) write_results(results, dir_path)
[docs]def write_result_test_setting_unit( results: List[SimulationResult], root_dir: str ) -> None: test_setting_index = results[0].result_index["test_setting_index"] dir_path = Path(root_dir) / str(test_setting_index) write_results(results, dir_path)
[docs]def write_pdf_report( results: List[SimulationResult], root_dir: str, display_items: dict = None ) -> None: test_setting_index = results[0].result_index["test_setting_index"] sample_index = results[0].result_index["sample_index"] dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index) dir_path.mkdir(parents=True, exist_ok=True) path = dir_path / f"{test_setting_index}_{sample_index}_quara_report.pdf" report.export_report_from_index( input_root_dir=root_dir, test_setting_index=test_setting_index, sample_index=sample_index, output_path=path, display_items=display_items, )
[docs]def write_result_case_unit(sim_result: SimulationResult, root_dir: str) -> None: test_setting_index = sim_result.result_index["test_setting_index"] sample_index = sim_result.result_index["sample_index"] case_index = sim_result.result_index["case_index"] # Save pickle dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index) path = dir_path / f"case_{case_index}_result.pickle" sim_result.to_pickle(path) # Save JSON # EstimationResult cannot be converted to JSON. # Therefore, alternative text is used. alternative_results = [] for r in sim_result.check_result["results"]: if r["name"] == "Consistency": alternative_text = "EstimationResult generated in the process of ConsistencyCheck is not dumped to json, check the pickle." new_r = copy.deepcopy(r) new_r["detail"]["estimation_result"] = alternative_text alternative_results.append(new_r) else: alternative_results.append(r) alternative_check_result = copy.deepcopy(sim_result.check_result) alternative_check_result["results"] = alternative_results path = dir_path / f"case_{case_index}_check_result.json" with open(path, "w") as f: json.dump( alternative_check_result, f, ensure_ascii=False, indent=4, separators=(",", ": "), )