Source code for quara.minimization_algorithm.projected_gradient_descent_backtracking

import time
from typing import Callable, List

import numpy as np


from quara.loss_function.loss_function import LossFunction, LossFunctionOption
from quara.minimization_algorithm.minimization_algorithm import (
    MinimizationAlgorithm,
    MinimizationAlgorithmOption,
    MinimizationResult,
)
from quara.math import func_proj
from quara.protocol.qtomography.standard.standard_qtomography import StandardQTomography
from quara.settings import Settings


[docs]class ProjectedGradientDescentBacktrackingResult(MinimizationResult): def __init__( self, value: np.ndarray, computation_time: float = None, k: int = None, fx: List[np.ndarray] = None, x: List[np.ndarray] = None, y: List[np.ndarray] = None, alpha: List[float] = None, error_values: List[float] = None, ): super().__init__(value, computation_time) self._k: int = k self._fx: List[np.ndarray] = fx self._x: List[np.ndarray] = x self._y: List[np.ndarray] = y self._alpha: List[float] = alpha self._error_values: List[float] = error_values @property def k(self) -> int: """returns the number of iterations. Returns ------- int the number of iterations. """ return self._k @property def fx(self) -> List[np.ndarray]: """return the value of f(x) per iteration. Returns ------- List[np.ndarray] the value of f(x) per iteration. """ return self._fx @property def x(self) -> List[np.ndarray]: """return the x per iteration. Returns ------- List[np.ndarray] the x per iteration. """ return self._x @property def y(self) -> List[np.ndarray]: """return the y per iteration. Returns ------- List[np.ndarray] the y per iteration. """ return self._y @property def alpha(self) -> List[np.ndarray]: """return the alpha per iteration. Returns ------- List[np.ndarray] the alpha per iteration. """ return self._alpha @property def error_values(self) -> List[np.ndarray]: """return the error_values per iteration. Returns ------- List[np.ndarray] the error_values per iteration. """ return self._error_values
[docs]class ProjectedGradientDescentBacktrackingOption(MinimizationAlgorithmOption): def __init__( self, on_algo_eq_constraint: bool = True, on_algo_ineq_constraint: bool = True, var_start: np.ndarray = None, mu: float = None, gamma: float = 0.3, mode_stopping_criterion_gradient_descent: str = "single_difference_loss", num_history_stopping_criterion_gradient_descent: int = 1, mode_proj_order: str = "eq_ineq", eps: float = None, ): """Constructor Parameters ---------- on_algo_eq_constraint : bool, optional whether this algorithm needs on algorithm equality constraint, by default True on_algo_ineq_constraint : bool, optional whether this algorithm needs on algorithm inequality constraint, by default True var_start : np.ndarray, optional initial variable for the algorithm, by default None mu : float, optional algorithm option ``mu``, by default None gamma : float, optional algorithm option ``gamma``, by default 0.3 mode_stopping_criterion_gradient_descent : str, optional mode of stopping criterion for gradient descent, by default "single_difference_loss" num_history_stopping_criterion_gradient_descent : int, optional number of history to be used stopping criterion for gradient descent, by default 1 this must be a integer and greater than or equal to 1. mode_proj_order : str, optional the order in which the projections are performed, by default "eq_ineq". eps : float, optional algorithm option ``epsilon``, by default None """ super().__init__( on_algo_eq_constraint=on_algo_eq_constraint, on_algo_ineq_constraint=on_algo_ineq_constraint, var_start=var_start, ) if mu is None and var_start is not None: mu = 3 / (2 * np.sqrt(var_start.shape[0])) self._mu: float = mu self._gamma: float = gamma if not mode_stopping_criterion_gradient_descent in [ "single_difference_loss", "sum_absolute_difference_loss", "sum_absolute_difference_variable", "sum_absolute_difference_projected_gradient", ]: raise ValueError( f"unsupported 'mode_stopping_criterion_gradient_descent'={mode_stopping_criterion_gradient_descent}" ) self._mode_stopping_criterion_gradient_descent = ( mode_stopping_criterion_gradient_descent ) if type(num_history_stopping_criterion_gradient_descent) != int: raise ValueError( f"type(num_history_stopping_criterion_gradient_descent) is not int. type={type(num_history_stopping_criterion_gradient_descent)}" ) if num_history_stopping_criterion_gradient_descent < 1: raise ValueError( f"num_history_stopping_criterion_gradient_descent must be greater than or equal to 1. num_history_stopping_criterion_gradient_descent={num_history_stopping_criterion_gradient_descent}" ) self._num_history_stopping_criterion_gradient_descent = ( num_history_stopping_criterion_gradient_descent ) if not mode_proj_order in ["eq_ineq", "ineq_eq"]: raise ValueError(f"unsupported mode_proj_order={mode_proj_order}") self._mode_proj_order: str = mode_proj_order if eps is None: eps = Settings.get_atol() / 10.0 self._eps: float = eps @property def mu(self) -> float: """returns algorithm option ``mu``. Returns ------- float algorithm option ``mu``. """ return self._mu @property def gamma(self) -> float: """returns algorithm option ``gamma``. Returns ------- float algorithm option ``gamma``. """ return self._gamma @property def mode_stopping_criterion_gradient_descent(self) -> str: """returns mode of stopping criterion for gradient descent. Returns ------- str mode of stopping criterion for gradient descent. """ return self._mode_stopping_criterion_gradient_descent @property def num_history_stopping_criterion_gradient_descent(self) -> int: """returns number of history to be used stopping criterion for gradient descent. Returns ------- int number of history to be used stopping criterion for gradient descent. """ return self._num_history_stopping_criterion_gradient_descent @property def mode_proj_order(self) -> str: """returns the order in which the projections are performed. Returns ------- str the order in which the projections are performed. """ return self._mode_proj_order @property def eps(self) -> float: """returns algorithm option ``eps``. Returns ------- float algorithm option ``eps``. """ return self._eps
[docs]class ProjectedGradientDescentBacktracking(MinimizationAlgorithm): def __init__(self, func_proj: Callable[[np.ndarray], np.ndarray] = None): """Constructor Parameters ---------- func_proj : Callable[[np.ndarray], np.ndarray], optional function of projection, by default None """ super().__init__() self._func_proj: Callable[[np.ndarray], np.ndarray] = func_proj self._is_gradient_required: bool = True self._is_hessian_required: bool = False self._qt: StandardQTomography = None @property def func_proj(self) -> Callable[[np.ndarray], np.ndarray]: """returns function of projection. Returns ------- Callable[[np.ndarray], np.ndarray] function of projection. """ return self._func_proj
[docs] def set_constraint_from_standard_qt_and_option( self, qt: StandardQTomography, option: ProjectedGradientDescentBacktrackingOption, ) -> None: """sets constraint from StandardQTomography and Algorithm Option. Parameters ---------- qt : StandardQTomography StandardQTomography to set constraint. option : ProjectedGradientDescentBacktrackingOption Algorithm Option. """ self._qt = qt if self._func_proj is not None: return setting_info = self._qt.generate_empty_estimation_obj_with_setting_info() if ( option.on_algo_eq_constraint == True and option.on_algo_ineq_constraint == True ): self._func_proj = setting_info.func_calc_proj_physical( on_para_eq_constraint=setting_info.on_para_eq_constraint, mode_proj_order=option.mode_proj_order, ) elif ( option.on_algo_eq_constraint == True and option.on_algo_ineq_constraint == False ): self._func_proj = setting_info.func_calc_proj_eq_constraint( setting_info.on_para_eq_constraint ) elif ( option.on_algo_eq_constraint == False and option.on_algo_ineq_constraint == True ): self._func_proj = setting_info.func_calc_proj_ineq_constraint( setting_info.on_para_eq_constraint ) else: self._func_proj = func_proj.proj_to_self()
[docs] def is_loss_sufficient(self) -> bool: """returns whether the loss is sufficient. Returns ------- bool whether the loss is sufficient. """ if self.loss is None: return False elif self.loss.on_value is False: return False elif self.loss.on_gradient is False: return False else: return True
[docs] def is_option_sufficient(self) -> bool: """returns whether the option is sufficient. Returns ------- bool whether the option is sufficient. """ if self.option is None: return False elif self.option.mu is not None and self.option.mu <= 0: return False elif self.option.gamma is None or self.option.gamma <= 0: return False elif self.option.eps is None or self.option.eps <= 0: return False else: return True
[docs] def is_loss_and_option_sufficient(self) -> bool: """returns whether the loss and the option are sufficient. Returns ------- bool whether the loss and the option are sufficient. """ # validate when option.var_start exists if ( self.option is not None and self.option.var_start is not None and self.loss is not None ): num_var_option = self.option.var_start.shape[0] num_var_loss = self.loss.num_var if num_var_option != num_var_loss: return False return True
[docs] def optimize( self, loss_function: LossFunction, loss_function_option: LossFunctionOption, algorithm_option: ProjectedGradientDescentBacktrackingOption, on_iteration_history: bool = False, ) -> ProjectedGradientDescentBacktrackingResult: """optimizes using specified parameters. Parameters ---------- loss_function : LossFunction Loss Function loss_function_option : LossFunctionOption Loss Function Option algorithm_option : ProjectedGradientDescentBaseOption Projected Gradient Descent Base Algorithm Option on_iteration_history : bool, optional whether to return iteration history, by default False Returns ------- ProjectedGradientDescentBaseResult the result of the optimization. Raises ------ ValueError when ``on_value`` of ``loss_function`` is False. ValueError when ``on_gradient`` of ``loss_function`` is False. """ if loss_function.on_value == False: raise ValueError( "to execute ProjectedGradientDescentBase, 'on_value' of loss_function must be True." ) if loss_function.on_gradient == False: raise ValueError( "to execute ProjectedGradientDescentBase, 'on_gradient' of loss_function must be True." ) if algorithm_option.var_start is None: x_prev = ( self._qt.generate_empty_estimation_obj_with_setting_info() .generate_origin_obj() .to_var() ) else: x_prev = algorithm_option.var_start x_next = None if algorithm_option.mu: mu = algorithm_option.mu elif algorithm_option.var_start: mu = 3 / (2 * np.sqrt(len(algorithm_option.var_start))) elif self._qt: mu = 3 / (2 * np.sqrt(self._qt.num_variables)) else: raise ValueError("unable to set the algorithm option mu.") gamma = algorithm_option.gamma eps = algorithm_option.eps # variables for debug if on_iteration_history: start_time = time.time() fxs = [loss_function.value(x_prev)] xs = [x_prev] ys = [] alphas = [] error_values = [] k = 0 is_doing = True while is_doing: # shift variables if x_next is not None: x_prev = x_next y_prev = ( self.func_proj(x_prev - loss_function.gradient(x_prev) / mu) - x_prev ) alpha = 1.0 while self._is_doing_for_alpha(x_prev, y_prev, alpha, gamma, loss_function): alpha = 0.5 * alpha x_next = x_prev + alpha * y_prev k += 1 # calc error value depend on "mode_stopping_criterion_gradient_descent" if ( algorithm_option.mode_stopping_criterion_gradient_descent == "single_difference_loss" ): error_value = loss_function.value(x_prev) - loss_function.value(x_next) elif ( algorithm_option.mode_stopping_criterion_gradient_descent == "sum_absolute_difference_loss" ): error_value = np.abs( loss_function.value(x_prev) - loss_function.value(x_next) ) elif ( algorithm_option.mode_stopping_criterion_gradient_descent == "sum_absolute_difference_variable" ): error_value = np.sqrt(np.sum((x_prev - x_next) ** 2)) elif ( algorithm_option.mode_stopping_criterion_gradient_descent == "sum_absolute_difference_projected_gradient" ): error_value = np.sqrt(np.sum(y_prev ** 2)) error_values.append(error_value) # calc sum of error values # if num_history_stopping_criterion_gradient_descent = 1, then this is single sum sum_range = min( len(error_values), algorithm_option.num_history_stopping_criterion_gradient_descent, ) value = np.sum(error_values[-sum_range:]) is_doing = True if value > eps else False # variables for iteration history if on_iteration_history: fxs.append(loss_function.value(x_next)) xs.append(x_next) ys.append(y_prev) alphas.append(alpha) if on_iteration_history: computation_time = time.time() - start_time result = ProjectedGradientDescentBacktrackingResult( x_next, computation_time=computation_time, k=k, fx=fxs, x=xs, y=ys, alpha=alphas, error_values=error_values, ) return result else: result = ProjectedGradientDescentBacktrackingResult(x_next) return result
def _is_doing_for_alpha( self, x_prev: np.ndarray, y_prev: np.ndarray, alpha: float, gamma: float, loss_function: LossFunction, ) -> bool: left_side = loss_function.value(x_prev + alpha * y_prev) right_side = loss_function.value(x_prev) + gamma * alpha * ( np.dot(y_prev, loss_function.gradient(x_prev)) ) return left_side > right_side