import time
from typing import Callable, List, Optional, Union
import copy
from collections import namedtuple
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
from tqdm import tqdm
from quara.objects.povm import Povm
from quara.objects.gate import Gate
from quara.objects.qoperation import QOperation
from quara.objects.state import State
from quara.protocol.qtomography.standard.standard_qst import StandardQst
from quara.protocol.qtomography.standard.standard_qtomography_estimator import (
StandardQTomographyEstimator,
StandardQTomographyEstimationResult,
)
from quara.simulation.standard_qtomography_simulation import (
StandardQTomographySimulationSetting,
)
from quara.utils import matrix_util
from quara.protocol.qtomography.estimator import EstimationResult
[docs]def calc_mse_general_norm(
xs: List[np.ndarray],
y: np.ndarray,
norm_function: Callable[[np.ndarray, np.ndarray], np.float64],
) -> np.float64:
"""calculates mse(mean squared error) of ``xs`` and ``y`` according to ``norm_function``.
Parameters
----------
xs : np.ndarray
sample values.
y : np.ndarray
true value.
norm_function : Callable[[np.ndarray, np.ndarray], np.float64]
norm function.
Returns
-------
np.float64
:math:`\\text{mse} = \\frac{1}{len(x)} \\sum_i \\text{norm_function}(x_i, y)^2`
"""
norms = []
for x in xs:
norm = norm_function(x, y) ** 2
norms.append(norm)
mse = np.mean(norms, dtype=np.float64)
return mse
[docs]def calc_covariance_matrix_of_prob_dist(
prob_dist: np.ndarray, data_num: int
) -> np.ndarray:
"""calculates covariance matrix of probability distribution.
Parameters
----------
prob_dist : np.ndarray
probability distribution.
data_num : int
number of data.
Returns
-------
np.ndarray
:math:`\\text{covariance matrix} = \\frac{1}{len(x)} (diag(p) - p \\cdot p^T)`, where N is ``data_num`` and p is ``prob_dist``.
"""
matrix = np.diag(prob_dist) - np.array([prob_dist]).T @ np.array([prob_dist])
return matrix / data_num
[docs]def calc_covariance_matrix_of_prob_dists(
prob_dists: List[np.ndarray], data_num: int
) -> np.ndarray:
"""calculates covariance matrix of probability distributions(= direct product of each covariance matrix of probability distribution).
Parameters
----------
prob_dists : List[np.ndarray]
probability distributions.
data_num : int
number of data.
Returns
-------
np.ndarray
direct product of each covariance matrix = :math:`\\oplus_j V(p^j)`, where :math:`V(p)` is covariance matrix of p.
"""
# calculate diagonal blocks
diag_blocks = [
calc_covariance_matrix_of_prob_dist(prob_dist, data_num)
for prob_dist in prob_dists
]
# calculate direct product of each covariance matrix of probability distribution
matrix_size = np.sum([len(prob_dist) for prob_dist in prob_dists])
matrix = np.zeros((matrix_size, matrix_size))
index = 0
for diag in diag_blocks:
size = diag.shape[0]
matrix[index : index + size, index : index + size] = diag
index += size
return matrix
def _calc_mse_linear_analytical_mode_qoperation(
xs: List[QOperation], ys: List[QOperation], with_std: bool = True
) -> np.float64:
points = []
for x, y in zip(xs, ys):
x_vec = x.to_stacked_vector()
y_vec = y.to_stacked_vector()
point = np.vdot(x_vec - y_vec, x_vec - y_vec)
points.append(point)
mse = np.mean(points, dtype=np.float64)
if with_std:
std = np.std(points, dtype=np.float64, ddof=1)
return mse, std
else:
return mse
def _calc_mse_linear_analytical_mode_var(
xs: List[QOperation], ys: List[QOperation]
) -> np.float64:
raise NotImplementedError()
# common
# statistical quantity
[docs]def calc_mse_qoperations(
xs: List[QOperation],
ys: List[QOperation],
mode: str = "qoperation",
with_std: bool = True,
) -> np.float64:
if mode == "qoperation":
return _calc_mse_linear_analytical_mode_qoperation(xs, ys, with_std=with_std)
elif mode == "var":
return _calc_mse_linear_analytical_mode_var(xs, ys)
else:
error_message = "The argument `mode` must be `qoperation` or `var`"
raise ValueError(error_message)
# common(StandardQTomography)
[docs]def convert_to_series(
results: List[StandardQTomographyEstimationResult], true_object: QOperation
):
# calc mse
results_tmp = [result.estimated_qoperation_sequence for result in results]
results_tmp = [
list(qoperation_sequence) for qoperation_sequence in zip(*results_tmp)
]
mses = [
calc_mse_qoperations(
qoperation_sequence, [true_object] * len(qoperation_sequence)
)
for qoperation_sequence in results_tmp
]
stds = [mse[1] for mse in mses]
mses = [mse[0] for mse in mses]
# convert to computation time series
comp_time_tmp = [result.computation_times for result in results]
comp_time = [list(comp_time) for comp_time in zip(*comp_time_tmp)]
return mses, stds, comp_time
# StandardQst, StandardQTomographyEstimator
[docs]def calc_estimate_with_average_comp_time(
tester_povms: List[Povm],
true_object: State,
num_data: List[int],
iteration: int,
estimator=StandardQTomographyEstimator,
on_para_eq_constraint: bool = True,
) -> StandardQTomographyEstimationResult:
qst = StandardQst(tester_povms, on_para_eq_constraint=on_para_eq_constraint)
# generate empi dists
empi_dists_seqs = []
for ite in range(iteration):
seeds = [ite] * len(num_data)
empi_dists_seq = qst.generate_empi_dists_sequence(true_object, num_data, seeds)
empi_dists_seqs.append(empi_dists_seq)
# transpose
empi_dists_seqs = [list(empi_dists_seq) for empi_dists_seq in zip(*empi_dists_seqs)]
# calc estimate
results = []
for ite, empi_dists_seq in enumerate(empi_dists_seqs):
start_time = time.time()
result = estimator.calc_estimate_sequence(qst, empi_dists_seq)
comp_time = time.time() - start_time
info = {
"iteration": ite + 1,
"data": empi_dists_seq,
"estimated_var_sequence": result.estimated_var_sequence,
"computation_times": comp_time,
}
result._computation_times = [comp_time]
results.append(result)
return results
# common(show log-log scatter?)
[docs]def show_mse(num_data: List[int], mses: List[float], title: str = "Mean squared error"):
trace = go.Scatter(x=num_data, y=mses, mode="lines+markers")
data = [trace]
layout = go.Layout(
title=title,
xaxis_title_text="Number of data",
yaxis_title_text="Mean squared error of estimates and true",
xaxis_type="log",
yaxis_type="log",
)
fig = go.Figure(data=data, layout=layout)
fig.show()
[docs]def make_mses_graph(
num_data: List[int],
mses: List[List[float]],
error_bar_values_list: List[List[float]] = None,
title: str = "Mean squared error",
additional_title_text: str = "",
names: Optional[List[str]] = None,
yaxis_title_text: str = "Mean squared error of estimates and true",
) -> List["Figure"]:
if not names:
names = [f"data_{i}" for i in range(len(mses))]
data = []
for i, mse in enumerate(mses):
error_y = dict(visible=False)
if error_bar_values_list and i < len(error_bar_values_list):
error_y = dict(
type="data", # value of error bar given in data coordinates
array=error_bar_values_list[i],
visible=True,
)
trace = go.Scatter(
x=num_data,
y=mse,
mode="lines+markers",
name=names[i],
error_y=error_y,
)
data.append(trace)
if additional_title_text:
title = f"{title}<br>{additional_title_text}"
layout = go.Layout(
title=title,
xaxis_title_text="Number of data",
yaxis_title_text=yaxis_title_text,
xaxis_type="log",
yaxis_type="log",
)
fig = go.Figure(data=data, layout=layout)
return fig
def _recreate_qoperation(
source_qoperation: "Qoperation", on_para_eq_constraint: bool
) -> "Qoperation":
# Make QOperation
true_object = copy.copy(source_qoperation)
if type(true_object) == State:
true_object_copied = State(
vec=true_object.vec,
c_sys=true_object.composite_system,
on_para_eq_constraint=on_para_eq_constraint,
)
elif type(true_object) == Povm:
true_object_copied = Povm(
vecs=true_object.vecs,
c_sys=true_object.composite_system,
on_para_eq_constraint=on_para_eq_constraint,
)
elif type(true_object) == Gate:
true_object_copied = Gate(
hs=true_object.hs,
c_sys=true_object.composite_system,
on_para_eq_constraint=on_para_eq_constraint,
)
else:
print(type(true_object))
message = f"true_object must be State, Povm, or Gate, not {type(true_object)}"
raise TypeError(message)
return true_object_copied
[docs]def make_mses_graph_estimation_results(
estimation_results_list: List["LinearEstimationResult"],
case_names: List[str],
true_object,
title: str = "Mean squared error",
additional_title_text: str = "",
show_analytical_results: bool = False,
estimator_list: list = None,
) -> "Figure":
num_data = estimation_results_list[0][0].num_data
mses_list = []
error_bar_values_list = []
n_rep = len(estimation_results_list[0])
display_case_names = case_names[:]
for estimation_results in estimation_results_list:
mses, sds, _ = convert_to_series(estimation_results, true_object)
mses_list.append(mses)
error_bar_values = [sigma / np.sqrt(n_rep) for sigma in sds]
error_bar_values_list.append(error_bar_values)
# calc analytical result
if show_analytical_results:
if not estimator_list:
raise ValueError(
"'show_analytical_results' is set True. But, 'estimator_list' is None."
)
(
analytical_mses,
analytical_case_names,
_,
_,
_,
) = _make_data_for_graphs_mses_analytical(
estimation_results_list, true_object, estimator_list
)
mses_list += analytical_mses
display_case_names += analytical_case_names
fig = make_mses_graph(
num_data,
mses_list,
names=display_case_names,
title=title,
additional_title_text=additional_title_text,
error_bar_values_list=error_bar_values_list,
)
return fig
def _make_data_for_graphs_mses_analytical(
estimation_results_list: List["EstimationResult"],
true_object,
estimator_list: List[Union["Estimator", str]],
):
if len(estimation_results_list) != len(estimator_list):
message = "`estimation_results_list` and `estimator_list` lengths do not match"
raise ValueError(message)
num_data = estimation_results_list[0][0].num_data
mses_list = []
qtomo_list = [e_list[0].qtomography for e_list in estimation_results_list]
qtomo_type_dict = {}
QTomoType = namedtuple(
"QTomoType", ["qtomography_name", "on_para_eq_constraint", "estimator_name"]
)
for i, qtomo in enumerate(qtomo_list):
if type(estimator_list[i]) == str:
estimator_name = estimator_list[i]
else:
estimator_name = estimator_list[i].__class__.__name__
qtomo_type = QTomoType(
qtomo.__class__.__name__,
qtomo.on_para_eq_constraint,
estimator_name,
)
qtomo_type_dict[qtomo_type] = qtomo
estimator_table = {
"LinearEstimator": "calc_mse_linear_analytical",
"LossMinimizationEstimator": "calc_cramer_rao_bound",
}
display_case_names = []
short_names = []
parameters = []
for qtomo_type, qtomo in qtomo_type_dict.items():
parameter = qtomo_type.on_para_eq_constraint
estimator_name = qtomo_type.estimator_name
if estimator_name not in estimator_table:
continue
method_name = estimator_table[estimator_name]
true_object_copied = _recreate_qoperation(
true_object, on_para_eq_constraint=parameter
)
true_mses = []
for num in num_data:
method = eval(f"qtomo.{method_name}")
if method_name == "calc_mse_linear_analytical":
true_mse = method(true_object_copied, [num] * qtomo.num_schedules)
elif method_name == "calc_cramer_rao_bound":
true_mse = method(true_object_copied, num, [num] * qtomo.num_schedules)
true_mses.append(true_mse)
mses_list.append(true_mses)
short_name = estimator_name.replace("Estimator", "")
if short_name == "LossMinimization":
short_name = "Cramer-Rao bound"
display_case_names.append(f"Analytical result ({short_name}, {parameter})")
short_names.append(short_name)
parameters.append(parameter)
return mses_list, display_case_names, short_names, parameters, num_data
[docs]def make_mses_graph_analytical(
estimation_results_list: List["LinearEstimationResult"],
true_object,
estimator_list: list,
) -> "Figure":
num_data = estimation_results_list[0][0].num_data
(
mses_list,
display_case_names,
short_names,
parameters,
num_data,
) = _make_data_for_graphs_mses_analytical(
estimation_results_list, true_object, estimator_list
)
figs = []
# make graphs by estimator
data_dict = {}
for i, estimator_name in enumerate(short_names):
if estimator_name in data_dict:
data_dict[estimator_name]["mses"].append(mses_list[i])
data_dict[estimator_name]["display_case_names"].append(
display_case_names[i]
)
data_dict[estimator_name]["parameters"].append(parameters[i])
else:
data_dict[estimator_name] = dict(
mses=[mses_list[i]],
display_case_names=[display_case_names[i]],
parameters=[parameters[i]],
)
for key, target_dict in data_dict.items():
if key == "Cramer-Rao bound":
fig = make_mses_graph(
num_data,
target_dict["mses"],
names=target_dict["display_case_names"],
additional_title_text=f"Analytical result<br>{key}",
)
else:
fig = make_mses_graph(
num_data,
target_dict["mses"],
names=target_dict["display_case_names"],
additional_title_text=f"Analytical result<br>estimator={key}",
)
figs.append(fig)
# make graphs by parameter
data_dict = {}
for i, parameter in enumerate(parameters):
if parameter in data_dict:
data_dict[parameter]["mses"].append(mses_list[i])
data_dict[parameter]["display_case_names"].append(display_case_names[i])
data_dict[parameter]["parameters"].append(parameters[i])
else:
data_dict[parameter] = dict(
mses=[mses_list[i]],
display_case_names=[display_case_names[i]],
parameters=[parameters[i]],
)
for key, target_dict in data_dict.items():
fig = make_mses_graph(
num_data,
target_dict["mses"],
names=target_dict["display_case_names"],
additional_title_text=f"Analytical result<br>parametorization={key}",
)
figs.append(fig)
return figs
[docs]def make_mses_graphs_estimator(
estimation_results_list: List["EstimationResult"],
simulation_settings: List["StandardQTomographySimulationSetting"],
true_object,
) -> list:
data_dict = {}
Category = namedtuple("Category", ["estimator", "loss", "algo"])
for i, s in enumerate(simulation_settings):
estimator_name = s.estimator.__class__.__name__
loss_name = s.loss.__class__.__name__ if s.loss else None
algo_name = s.algo.__class__.__name__ if s.algo else None
results = estimation_results_list[i]
case_name = s.name
category = Category(estimator_name, loss_name, algo_name)
if category in data_dict:
data_dict[category]["estimation_results"].append(results)
data_dict[category]["case_names"].append(case_name)
data_dict[category]["estimators"].append(estimator_name)
data_dict[category]["losses"].append(loss_name)
data_dict[category]["algos"].append(algo_name)
else:
data_dict[category] = dict(
estimation_results=[results],
case_names=[s.name],
estimators=[s.estimator],
losses=[s.loss],
algos=[s.algo],
)
figs = []
for key, target_dict in data_dict.items():
style = "font-size: 14px;"
additional_title_text = (
f'<span style="{style}">Estimator={key.estimator.replace("Estimator", "")}'
)
if key.loss is not None:
additional_title_text += f"<br>Loss={key.loss}"
if key.algo is not None:
additional_title_text += f"<br>Algo={key.algo}"
additional_title_text += "</span>"
fig = make_mses_graph_estimation_results(
target_dict["estimation_results"],
target_dict["case_names"],
true_object,
additional_title_text=additional_title_text,
show_analytical_results=True,
estimator_list=target_dict["estimators"],
)
fig.update_layout(title=dict(yanchor="bottom", y=0.96))
figs.append(fig)
return figs
[docs]def make_mses_graphs_para(
estimation_results_list: List["EstimationResult"],
case_names: List[str],
true_object: "QOperation",
) -> list:
def _get_parameter(estimation_results: List["EstimationResult"]) -> bool:
return estimation_results[0].qtomography.on_para_eq_constraint
# Split data (True/False)
true_dict = dict(title="True", estimation_results=[], case_names=[])
false_dict = dict(title="False", estimation_results=[], case_names=[])
for i, results in enumerate(estimation_results_list):
if _get_parameter(results):
# on_para_eq_constraint = True
true_dict["estimation_results"].append(results)
true_dict["case_names"].append(case_names[i])
else:
# on_para_eq_constraint = False
false_dict["estimation_results"].append(results)
false_dict["case_names"].append(case_names[i])
# Make figure
figs = []
for target_dict in [true_dict, false_dict]:
if not target_dict["case_names"]:
continue
fig = make_mses_graph_estimation_results(
target_dict["estimation_results"],
target_dict["case_names"],
true_object,
additional_title_text=f"parametrization={target_dict['title']}",
)
figs.append(fig)
return figs
[docs]def show_mses(
num_data: List[int],
mses: List[List[float]],
title: str = "Mean squared error",
names: Optional[List[str]] = None,
):
fig = make_mses_graph(num_data=num_data, mses=mses, title=title, names=names)
fig.show()
# common(depend on "num_data")
[docs]def show_computation_times(
num_data: List[int],
computation_times_sequence: List[List[float]],
title: str = "Computation times for each estimate",
histnorm: str = "count",
):
if not histnorm in ["count", "percent", "frequency"]:
raise ValueError(
f"histnorm is in ['count', 'percent', 'frequency']. histnorm of HS is {histnorm}"
)
subplot_titles = [
f"Number of data = {num}<br>Total count of number = {len(computation_times)}"
for num, computation_times in zip(num_data, computation_times_sequence)
]
fig = make_subplots(rows=1, cols=len(num_data), subplot_titles=subplot_titles)
# "count", "percent", "frequency"
histnorm_quara_to_plotly = {
"count": "",
"percent": "percent",
"frequency": "probability",
}
histnorm_param = histnorm_quara_to_plotly[histnorm]
for index, computation_times in enumerate(computation_times_sequence):
trace = go.Histogram(
x=computation_times,
xbins=dict(start=0),
histnorm=histnorm_param,
marker=dict(color="Blue"),
)
fig.append_trace(trace, 1, index + 1)
fig.update_layout(
title_text=title,
xaxis_title_text="Computation time(sec)",
yaxis_title_text=histnorm,
showlegend=False,
width=1200,
)
fig.show()
# common(show scatter)
[docs]def show_average_computation_times(
num_data: List[int],
computation_times_sequence: List[float],
num_of_runs,
title: str = None,
):
trace = go.Scatter(x=num_data, y=computation_times_sequence, mode="lines+markers")
data = [trace]
if title is None:
title = f"Computation times for estimates<br> number of runs for averaging = {num_of_runs}"
max_value = np.max(computation_times_sequence)
layout = go.Layout(
title=title,
xaxis_title_text="Number of data",
yaxis_title_text="Average of computation times(sec)",
# yaxis_min=0,
xaxis_type="log",
yaxis_range=[0, max_value * 1.1],
)
fig = go.Figure(data=data, layout=layout)
fig.show()
[docs]def make_empi_dists_mse_graph(
estimation_results: List["LinearEstimationResult"], true_object: "QOperation"
):
qtomography = estimation_results[0]._qtomography
num_schedules = qtomography.num_schedules
num_data = estimation_results[0].num_data
n_rep = len(estimation_results)
# Data
display_names = ["Empirical distributions"]
para = estimation_results[0].estimated_qoperation.on_para_eq_constraint
true_object_copied = _recreate_qoperation(true_object, para)
empi_dists = extract_empi_dists(estimation_results)
xs_list_list = empi_dists
ys_list_list = [[qtomography.calc_prob_dists(true_object_copied)] * n_rep] * len(
num_data
)
mses = []
error_bar_values = []
for i in range(len(num_data)):
mse, std = matrix_util.calc_mse_prob_dists(xs_list_list[i], ys_list_list[i])
mses.append(mse)
sigma = std
error_bar_value = sigma / np.sqrt(n_rep)
error_bar_values.append(error_bar_value)
mses_list = [mses]
error_bar_values_list = [error_bar_values]
# Analytical
true_mses = []
for num in num_data:
true_mse = qtomography.calc_mse_empi_dists_analytical(
true_object_copied, [num] * num_schedules
)
true_mses.append(true_mse)
mses_list.append(true_mses)
display_names.append(f"Analytical result")
fig = make_mses_graph(
mses=mses_list,
num_data=num_data,
names=display_names,
error_bar_values_list=error_bar_values_list,
yaxis_title_text="Mean squared error",
)
return fig