__version__ = '2.0.2'
__author__ = "Avinash Kak (kak@purdue.edu)"
__date__ = '2023-June-14'
__url__ = 'https://engineering.purdue.edu/kak/distNonlinearLeastSquares/NonlinearLeastSquares-2.0.2.html'
__copyright__ = "(C) 2023 Avinash Kak. Python Software Foundation."
import numpy
import numpy.linalg
import scipy
import math
import os,sys,glob,re
import itertools
numpy.set_printoptions(precision=3)
class NonlinearLeastSquares(object):
def __init__(self, *args, **kwargs):
'constructor'
if args:
raise Exception('''The NonlinearLeastSquares constructor can only be called with '''
'''the following keyword arguments: X, Fvec, num_measurements, '''
'''num_parameters, initial_params_dict, jacobian_functionals_array, '''
'''initial_param_values_file, display_function''')
allowed_keys = 'initial_param_values_file','initial_params_dict','measured_data','max_iterations','delta_for_jacobian','delta_for_step_size','jacobian_functionals_array','num_measurements','num_parameters','display_function','debug'
keywords_used = kwargs.keys()
for keyword in keywords_used:
if keyword not in allowed_keys:
raise Exception("Wrong key used in constructor call --- perhaps spelling error")
X=Fvec=num_measurements=num_parameters=initial_param_values_file=initial_params_dict=measured_data_file=max_iterations=delta_for_jacobian=delta_for_step_size=jacobian_functionals_array=display_function=debug=None
if 'initial_params_dict' in kwargs: initial_params_dict=kwargs.pop('initial_params_dict')
if 'initial_param_values_file' in kwargs: initial_param_values_file=kwargs.pop('initial_param_values_file')
if 'max_iterations' in kwargs: max_iterations=kwargs.pop('max_iterations')
if 'delta_for_jacobian' in kwargs: delta_for_jacobian=kwargs.pop('delta_for_jacobian')
if 'delta_for_step_size' in kwargs: delta_for_step_size=kwargs.pop('delta_for_step_size')
if 'X' in kwargs: X=kwargs.pop('X')
if 'Fvec' in kwargs: X=kwargs.pop('Fvec')
if 'num_measurements' in kwargs: num_measurements=kwargs.pop('num_measurements')
if 'num_parameters' in kwargs: num_parameters=kwargs.pop('num_parameters')
if 'jacobian_functionals_array' in kwargs: jacobian_functionals_array=kwargs.pop('jacobian_functionals_array')
# if 'initial_param_values_file' in kwargs: initial_param_values_file=kwargs.pop('initial_param_values_file')
if 'debug' in kwargs: debug=kwargs.pop('debug')
if initial_params_dict and initial_param_values_file:
raise Exception("You must choose either the 'initial_param_values_file' or the 'initial_params_dict' option in the constructor, but not both")
self.X = X
self.Fvec = Fvec # is a column vector --- meaning a numpy matrix with just one column
self.X_BA = None
self.Fvec_BA = None
self.num_measurements = num_measurements
self.num_parameters = num_parameters
self.initial_params_dict = initial_params_dict
self.jacobian_functionals_array = jacobian_functionals_array
self.display_function = display_function
if max_iterations:
self.max_iterations = max_iterations
else:
raise Exception("The constructor must specify a value for max_iterations")
if delta_for_jacobian:
self.delta_for_jacobian = delta_for_jacobian
elif jacobian_functionals_array is None:
raise Exception("When not using 'jacobian_functionals_array', you must explicitly set 'delta_for_jacobian' in the constructor for NonlinearLeastSquares")
self.delta_for_step_size = delta_for_step_size
self.params_ordered_list = None
self.params_arranged_list = None # For scene reconstruction, we use arranged list and not ordered list
self.problem = 'surface_fitting' # set to "sfm" for solving "structure from camera motion" problems
self.debug = debug if debug else False
self.debug2 = False
def set_problem(self, prob):
'''
If you are using this module to solve structure from camera motion (sfm) problems, use this method
to set 'self.problem' to 'sfm_N' where 'N' is the number of world points you are tracking. This
is needed because sfm needs the specialized display function defined for the ProjectiveCamera class.
'''
self.problem = prob
def set_num_measurements(self, how_many_measurements):
print("\nNumber of measurements: ", how_many_measurements)
self.num_measurements = how_many_measurements
def set_num_parameters(self, how_many_parameters):
print("\nNumber of parameters: ", how_many_parameters)
self.num_parameters = how_many_parameters
def set_initial_params(self, initial_params_dict):
self.initial_params_dict = initial_params_dict
self.params_dict = initial_params_dict
def set_params_ordered_list(self, params_list):
self.params_ordered_list = sorted(params_list)
def set_params_arranged_list(self, params_list):
self.params_arranged_list = params_list
def set_X(self, X):
self.X = numpy.asmatrix(numpy.copy(X))
def set_X_BA(self, X_BA):
self.X_BA = numpy.asmatrix(numpy.copy(X_BA))
def set_Fvec(self, Fvector):
'''
This method supplies the NonlinearLeastSquares class with the prediction vector
whose each element is a functional form of the prediction in the observed data vector
X. Note that Fvec is a column vector --- meaning a numpy matrix with just one column.
'''
self.Fvec = Fvector
def set_Fvec_BA(self, Fvector_BA):
'''
You need to call this method for providing the NonlinearLeastSquares class with the
prediction vector if you are going to be using the bundle-adjustment capabilities
of the class.
'''
self.Fvec_BA = Fvector_BA
def set_jacobian_functionals_array(self, jacobian_functionals_array):
'''
This method expects for its argument an Nxp matrix of functionals for the partial
derivatives needed for the Jacobian matrix. N is the number of measurements in
the X vector and p is the number of parameters in the model. If you are using
nonlinear least-squares to fit optimal surfaces to noisy measurements over the
xy-plane, each element of the X vector would correspond to one such measurement at
some (x,y) coordinates. And an element the argument jacobian_functionals_array chararray
would correspond to the partial derivative of the model functional that already
has incorporated the (x,y) coordinates corresponding to that row and that is
a partial derivative of the model with respect to the parameter corresponding to
the column.
'''
self.jacobian_functionals_array = jacobian_functionals_array # a chararray of size Nxp
def set_display_function(self, display_function):
self.display_function = display_function
def set_debug(self, debug):
self.debug = debug
###%%%
############################# Nonlinear Least-Squares with Basic Gradient Descent ############################
def grad_descent(self):
'''
This is an implementation of the basic gradient descent algorithm for nonlinear least-squares as described
in my Lecture 13 notes at the lecture-notes website for Purdue University's ECE661: Computer Vision
'''
error_norm_with_iteration = []
delta_for_jacobian = self.delta_for_jacobian if self.jacobian_functionals_array is None else None
if self.delta_for_step_size is not None:
delta_for_step_size = self.delta_for_step_size
else:
raise Exception("You must set the 'delta_for_step_size' option in the constructor for the gradient-descent algorithm")
num_elements = len(self.Fvec)
num_measurements = len(self.X)
params_list = self.params_ordered_list if self.params_ordered_list is not None else self.params_arranged_list
num_params = len(params_list)
current_param_values = [self.params_dict[param] for param in params_list]
current_param_values = numpy.matrix(current_param_values).T
current_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X))
for i in range(num_measurements):
current_fit_to_measurements[i,0] = \
eval(self._eval_functional_element(self.Fvec[i,0], self.initial_params_dict))
if self.debug:
print("\ncurrent_fit_to_measurements:")
print(str(current_fit_to_measurements))
current_error = self.X - current_fit_to_measurements
if self.debug:
print("\ncurrent error:")
print(str(current_error))
print("current error shape: %s" % str(current_error.shape))
current_error_norm = numpy.linalg.norm(self.X - current_fit_to_measurements)
if current_error_norm < 1e-12:
print("\nCurrent error norm: %.10f" % current_error_norm)
print('''\nLooks like your initial choices for the parameters are perfect. '''
'''Perhaps there is nothing to be gained by invoking nonlinear least-squares '''
'''on your problem.''')
sys.exit(1)
error_norm_with_iteration.append(current_error_norm)
if self.debug:
print("current error norm: %s" % str(current_error_norm))
new_param_values = new_fit_to_measurements = new_error_norm = None
iteration_index = 0
for iteration_index in range(self.max_iterations):
if self.debug:
print("\n\n ======================================== GD ITERATION: %d\n\n" % iteration_index)
jacobian = numpy.asmatrix(numpy.zeros((num_measurements, num_params), dtype=float))
if self.jacobian_functionals_array is not None:
for i in range(num_measurements):
params_dict_local = {params_list[i] : current_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.debug is True and i == 0:
print("\ncurrent values for parameters: %s" % str(sorted(params_dict_local.items())))
for j in range(num_params):
jacobian[i,j] = eval(self._eval_functional_element(self.jacobian_functionals_array[i,j], params_dict_local))
else:
for i in range(num_measurements):
params_dict_local = {params_list[i] : current_param_values[i].tolist()[0][0] for i in range(num_params)}
for j in range(num_params):
incremented_params_dict_local = {param : params_dict_local[param] for param in params_dict_local}
param = self.params_ordered_list[j] if self.params_ordered_list is not None else self.params_arranged_list[j]
evaled_element1 = self._eval_functional_element(self.Fvec[i][0], params_dict_local)
incremented_params_dict_local[param] = params_dict_local[param] + delta_for_jacobian
evaled_element2 = self._eval_functional_element(self.Fvec[i][0], incremented_params_dict_local)
jacobian[i,j] = (eval(evaled_element2) - eval(evaled_element1)) / delta_for_jacobian
params_dict_local = None
if self.debug:
print("jacobian:")
print(str(jacobian))
# print("jacobian shape: %s" % str(jacobian.shape))
new_param_values = current_param_values + 2 * delta_for_step_size * (jacobian.T * current_error)
if self.debug:
print("\nnew parameter values:")
print(str(new_param_values.T))
new_params_dict = {params_list[i] : new_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.debug:
print("new_params_dict: %s" % str(new_params_dict))
new_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X))
for i in range(num_measurements):
new_fit_to_measurements[i,0] = eval(self._eval_functional_element(self.Fvec[i][0], new_params_dict))
if self.debug:
print("new_fit_to_measurements:")
print(str(new_fit_to_measurements))
new_error = self.X - new_fit_to_measurements
if self.debug:
print("\nnew error:")
print(str(new_error))
new_error_norm = numpy.linalg.norm(self.X - new_fit_to_measurements)
if self.debug:
print("\nnew error norm: %s" % str(new_error_norm))
if new_error_norm > error_norm_with_iteration[-1]:
break
error_norm_with_iteration.append(new_error_norm)
if self.debug:
print("\nerror norms with iterations: %s" % str(error_norm_with_iteration))
if self.display_function is not None and iteration_index % int(self.max_iterations/5.0) == 0:
self.display_function(new_fit_to_measurements, new_error_norm, iteration_index)
current_param_values = new_param_values
if self.debug:
print("\nerror norms with iterations: %s" % str(error_norm_with_iteration))
print("\niterations used: %d" % iteration_index)
print("\n\nfinal values for the parameters: ")
print(str(new_param_values))
if self.debug is True and iteration_index == self.max_iterations - 1:
print("\n\nWARNING: max iterations reached without getting to the minimum")
if self.display_function:
self.display_function(new_fit_to_measurements, new_error_norm, iteration_index)
result = {"error_norms_with_iterations" : error_norm_with_iteration,
"number_of_iterations" : iteration_index,
"parameter_values" : new_param_values}
return result
###%%%
############# Nonlinear Least-Squares with the Levenberg-Marquardt Algorithm for Gradient Descent ############
def leven_marq(self):
'''
This is an implementation of the Levenberg-Marquardt algorithm for nonlinear least-squares as described
in my Lecture 13 notes at the lecture-notes website for Purdue University's ECE661: Computer Vision
This function is used the following scripts in the ExamplesOptimizedSurfaceFit directory of the distro:
leven_marq.py
leven_marq_with_partial_derivatives.py
It is important to note that the above two scripts do NOT call leven_marq() function directly. AS
stated in the main document page for the NonlinearLeastSquares module, the code in the file
NonlinearLeastSquares.py is written in a domain agnostic manner. So you need a domain adaptation
class that knows how to package the arguments for calling leven_marq(). For the case of the two scripts
listed above, that domain-specific class in the distro is OptimizedSurfaceFit. It is a co-class of
the main module class NonlinearLeastSquares in the distribution.
The function leven_marq() defined here is ALSO used in the following two scripts
sfm_with_calibrated_cameras_translations_only.py
sfm_with_uncalibrated_cameras_translations_only.py
in the ExamplesStructureFromCameraMotion directory of the distribution. Again, these example scripts
do NOT directly call the leven_marq() function. As shown in the two scripts, they must first construct
an instance of the ProjectiveCamera class that knows how to bundle the arguments together for calling
the leven_marq() function.
'''
if os.path.isdir("figs"):
list(map(os.remove, glob.glob('figs/*.png')))
else:
os.mkdir("figs")
error_norm_with_iteration = []
error_norm_per_measurement_with_iteration = []
alambda_with_iteration = []
delta_for_jacobian = self.delta_for_jacobian if self.jacobian_functionals_array is None else None
num_elements = len(self.Fvec)
num_measurements = len(self.X)
params_list = self.params_ordered_list if self.params_ordered_list is not None else self.params_arranged_list
num_params = len(params_list)
current_param_values = [self.params_dict[param] for param in params_list]
current_param_values = numpy.matrix(current_param_values).T
current_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X))
for i in range(num_measurements):
current_fit_to_measurements[i,0] = eval(self._eval_functional_element(self.Fvec[i,0], self.initial_params_dict))
current_error = self.X - current_fit_to_measurements
print("\n\ncurrent error (before the iterations):")
print(current_error.flatten().tolist()[0])
current_error_norm = numpy.linalg.norm(self.X - current_fit_to_measurements)
error_norm_with_iteration.append(current_error_norm)
current_error_norm_per_measurement = current_error_norm / math.sqrt(num_measurements)
print("\n\ncurrent error norm per measurement before iterations: %s" % str(current_error_norm_per_measurement))
if current_error_norm_per_measurement < 1e-10:
print("\n\nCurrent error norm: %.10f" % current_error_norm_per_measurement)
print('''\n\nLooks like your initial choices for the parameters are perfect. Perhaps there is nothing'''
'''to be gained by invoking nonlinear least-squares on your problem.''')
sys.exit(1)
error_norm_per_measurement_with_iteration.append(current_error_norm_per_measurement)
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None and self.problem.startswith("sfm"):
predicted_pixel_coordinates = current_fit_to_measurements.flatten().tolist()[0]
predicted_pixels = [(predicted_pixel_coordinates[2*x], predicted_pixel_coordinates[2*x+1]) for x in range(len(predicted_pixel_coordinates) // 2)]
self.display_function(predicted_pixels, None, current_error_norm_per_measurement)
else:
self.display_function(current_fit_to_measurements, current_error_norm_per_measurement, -1)
new_param_values=new_fit_to_measurements=new_error_norm=new_error_norm_per_measurement=None
iteration_index = 0
self.alambda = None
self.rho = None
alambda = None
rho = None
# An important feature of LM is that ONLY SOME OF THE ITERATIONS cause a reduction in
# the error vector (which is the difference between the measured data and its predicted
# values from the current knowledge of the parameters), the following list stores just
# those iteration index values that were productive in reducing this error. This list is
# useful for deciding when to display the partial results.
productive_iteration_index_values = []
need_fresh_jacobian_flag = True
A = g = None
iterations_used = None
best_estimated_structure = best_error_norm = best_error_norm_per_measurement=None
for iteration_index in range(self.max_iterations):
if need_fresh_jacobian_flag is True:
print("\n\nCalculating a fresh jacobian\n\n")
jacobian = numpy.asmatrix(numpy.zeros((num_measurements, num_params), dtype=float))
for i in range(num_measurements):
params_dict_local = {params_list[i] : current_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.jacobian_functionals_array is not None:
'''
A functional form was supplied for the Jacobian. Use it.
'''
for j in range(num_params):
jacobian[i,j] = \
eval(self._eval_functional_element(self.jacobian_functionals_array[i,j], params_dict_local))
else:
'''
Estimate your own Jacobian
'''
for i in range(num_measurements):
for j in range(num_params):
incremented_params_dict_local = {param : params_dict_local[param] for param in params_dict_local}
param = self.params_ordered_list[j] if self.params_ordered_list is not None else self.params_arranged_list[j]
evaled_element1 = self._eval_functional_element(self.Fvec[i,0], params_dict_local)
incremented_params_dict_local[param] = incremented_params_dict_local[param] + delta_for_jacobian
evaled_element2 = self._eval_functional_element(self.Fvec[i,0], incremented_params_dict_local)
jacobian[i,j] = (eval(evaled_element2) - eval(evaled_element1)) / delta_for_jacobian
print("\n\nFinished calculating the Jacobian")
if self.debug:
print("\njacobian:")
print(jacobian)
A = jacobian.T * jacobian
g = jacobian.T * current_error
if iteration_index == 0:
JtJ_max = max(A.diagonal().tolist()[0])
print("\n\nmax on the diagonal on J^T.J: %s" % str(JtJ_max))
self.alambda = JtJ_max / 1000
alambda = self.alambda
print("\n\nWe start with alambda = %f" % self.alambda)
if self.debug:
print("\ng vector for iteration_index: %d" % iteration_index)
print(str(g.T))
if abs(numpy.max(g)) < 0.0000001:
print("absolute value of the largest component of g below threshold --- quitting iterations")
break
B = self._pseudoinverse(A + alambda * numpy.asmatrix(numpy.identity(num_params)))
new_delta_param = B * g
new_param_values = current_param_values + new_delta_param
if self.debug:
print("\nnew parameter values:")
print(str(new_param_values.T))
new_params_dict = {params_list[i] : new_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.debug:
print("\nnew_params_dict: %s" % str(sorted(new_params_dict.items())))
new_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X))
for i in range(num_measurements):
new_fit_to_measurements[i,0] = eval(self._eval_functional_element(self.Fvec[i,0], new_params_dict))
newly_projected_pixel_coordinates = new_fit_to_measurements.flatten().tolist()[0]
newly_projected_pixels = [(newly_projected_pixel_coordinates[2*x], newly_projected_pixel_coordinates[2*x+1]) for x in range(len(newly_projected_pixel_coordinates) // 2)]
if self.debug:
print("\nnew_fit_to_measurements (shown as transpose):")
print(str(new_fit_to_measurements.T))
new_error = self.X - new_fit_to_measurements
if self.debug:
print("\nnew error magnitudes at each measurement:")
print(str(new_error.flatten().tolist()[0]))
new_error_norm = numpy.linalg.norm(self.X - new_fit_to_measurements)
new_error_norm_per_measurement = new_error_norm / math.sqrt(num_measurements)
print("\nnew error norm per measurement: %s" % str(new_error_norm_per_measurement))
rho = ( error_norm_with_iteration[-1] ** 2 - new_error_norm ** 2 ) / \
(new_delta_param.T * g + alambda * new_delta_param.T * new_delta_param)
if rho <= 0.0:
need_fresh_jacobian_flag = False
alambda = alambda * max( [1.0/3.0, 1.0 - (2.0 * rho - 1.0) ** 3] ) ## BIZARRE that a matrix is returned
alambda = alambda.tolist()[0][0]
if alambda > 1e12:
if self.debug:
print("\nIterations terminated because alambda exceeded limit")
break
if self.debug:
print("\nNO change in parameters for iteration_index: %d with alambda = %f" % (iteration_index, alambda))
continue
else:
need_fresh_jacobian_flag = True
if self.debug:
print("\n\n================================================Showing LM results for iteration: %d\n"
% (iteration_index+1))
productive_iteration_index_values.append(iteration_index)
alambda = self.alambda
error_norm_with_iteration.append(new_error_norm)
error_norm_per_measurement_with_iteration.append(new_error_norm_per_measurement)
print("\n\nerror norms per measurement for all iterations: %s" % str(error_norm_per_measurement_with_iteration))
current_param_values = new_param_values
best_param_values = new_param_values
best_predicted_pixels = newly_projected_pixels
iterations_used = iteration_index + 1
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None:
if self.problem.startswith("sfm"):
num_structure_points = int(self.problem.split("_")[1])
estimated_structure = new_param_values[-3*num_structure_points:].flatten().tolist()[0]
estimated_structure = [estimated_structure[3*i:3*i+3] for i in range(num_structure_points)]
best_estimated_structure = estimated_structure
best_error_norm_per_measurement = new_error_norm_per_measurement
self.display_function(best_predicted_pixels, estimated_structure, new_error_norm_per_measurement, len(productive_iteration_index_values)-1)
else:
self.display_function(new_fit_to_measurements, new_error_norm_per_measurement, len(productive_iteration_index_values)-1)
if self.debug:
# print("\nerror norms for all iterations: %s" % str(error_norm_with_iteration))
print("\nerror norms for all iterations: %s" % str(error_norm_per_measurement_with_iteration))
print("\niterations used: %d" % (len(productive_iteration_index_values) - 1))
print("\nproductive iteration index values: %s" % str(productive_iteration_index_values))
print("\n\nfinal values for the parameters: ")
print(str(new_param_values.T))
if self.debug is True and iteration_index == self.max_iterations - 1:
print("\n\nWARNING: max iterations reached without getting to the minimum")
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None:
if self.problem.startswith("sfm"):
self.display_function(best_predicted_pixels, best_estimated_structure, best_error_norm_per_measurement,
len(productive_iteration_index_values)-1)
else:
self.display_function(new_fit_to_measurements, new_error_norm_per_measurement, len(productive_iteration_index_values)-1)
result = {"error_norms_with_iterations" : error_norm_per_measurement_with_iteration,
# "number_of_iterations" : len(productive_iteration_index_values),
"number_of_iterations" : iterations_used,
"parameter_values" : best_param_values}
return result
def leven_marq_v1_5(self):
"""
This is the implementation of the leven_marq() function as it existed in Version 1.5. On account of the
fact that I made significant changes to this function in Version 2.0.0 of the module, I have retained the
old version for the old-time users of my module.
"""
if os.path.isdir("figs"):
list(map(os.remove, glob.glob('figs/*.png')))
else:
os.mkdir("figs")
error_norm_with_iteration = []
delta_for_jacobian = self.delta_for_jacobian if self.jacobian_functionals_array is None else None
# delta_for_step_size = self.delta_for_step_size if self.jacobian_functionals_array is None else None
num_elements = len(self.Fvec)
num_measurements = len(self.X)
params_list = self.params_ordered_list if self.params_ordered_list is not None else self.params_arranged_list
num_params = len(params_list)
current_param_values = [self.params_dict[param] for param in params_list]
current_param_values = numpy.matrix(current_param_values).T
current_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X))
for i in range(num_measurements):
current_fit_to_measurements[i,0] = \
eval(self._eval_functional_element(self.Fvec[i,0], self.initial_params_dict))
if self.debug:
print("\nleven_marq: current_fit_to_measurements (shown as transpose):")
print(str(current_fit_to_measurements.T))
current_error = self.X - current_fit_to_measurements
# if self.debug:
print("\ncurrent error (shown as transpose):")
print(str(current_error.T))
current_error_norm = numpy.linalg.norm(self.X - current_fit_to_measurements)
if current_error_norm < 1e-12:
print("\nCurrent error norm: %.10f" % current_error_norm)
print('''\nLooks like your initial choices for the parameters are perfect. '''
'''Perhaps there is nothing to be gained by invoking nonlinear least-squares '''
'''on your problem.''')
sys.exit(1)
error_norm_with_iteration.append(current_error_norm)
if self.debug:
print("\ncurrent error norm: %s" % str(current_error_norm))
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None:
self.display_function(current_fit_to_measurements, current_error_norm, -1)
new_param_values = new_fit_to_measurements = new_error_norm = None
iteration_index = 0
alambda = 0.001
# If 10 CONSECUTIVE STEPS in the parameter hyperplane turn out to the wrong choices,
# we terminate the iterations. If you want to change the number of consecutively
# occurring stops, you have to make changes at three different places in this file,
# including the statement shown below.
# wrong_direction_flags = [0] * 10
wrong_direction_flags = [0] * 20
# An important feature of LM is that ONLY SOME OF THE ITERATIONS cause a reduction in
# the error vector (which is the difference between the measured data and its predicted
# values from the current knowledge of the parameters), the following list stores just
# those iteration index values that were productive in reducing this error. This list is
# useful for deciding when to display the partial results.
productive_iteration_index_values = [-1]
for iteration_index in range(self.max_iterations):
jacobian = numpy.asmatrix(numpy.zeros((num_measurements, num_params), dtype=float))
if self.jacobian_functionals_array is not None:
'''
A functional form was supplied for the Jacobian. Use it.
'''
for i in range(num_measurements):
params_dict_local = {params_list[i] : current_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.debug is True and i == 0:
print("\ncurrent values for parameters: %s" % str(sorted(params_dict_local.items())))
for j in range(num_params):
jacobian[i,j] = \
eval(self._eval_functional_element(self.jacobian_functionals_array[i,j], params_dict_local))
else:
'''
Estimate your own Jacobian
'''
for i in range(num_measurements):
params_dict_local = {params_list[i] : current_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.debug is True and i == 0:
print("\ncurrent values for parameters: %s" % str(sorted(params_dict_local.items())))
for j in range(num_params):
incremented_params_dict_local = {param : params_dict_local[param] for param in params_dict_local}
param = self.params_ordered_list[j] if self.params_ordered_list is not None else self.params_arranged_list[j]
evaled_element1 = self._eval_functional_element(self.Fvec[i,0], params_dict_local)
incremented_params_dict_local[param] = params_dict_local[param] + delta_for_jacobian
evaled_element2 = self._eval_functional_element(self.Fvec[i,0], incremented_params_dict_local)
jacobian[i,j] = (eval(evaled_element2) - eval(evaled_element1)) / delta_for_jacobian
params_dict_local = None
if self.debug:
print("\njacobian:")
print(str(jacobian))
# print("\njacobian shape: %s" % str(jacobian.shape))
A = jacobian.T * jacobian
g = jacobian.T * current_error
if self.debug:
print("\ng vector for iteration_index: %d" % iteration_index)
print(str(g.T))
if abs(numpy.max(g)) < 0.0000001:
print("absolute value of the largest component of g below threshold --- quitting iterations")
break
B = numpy.linalg.inv(A + alambda * numpy.asmatrix(numpy.identity(num_params)))
new_delta_param = alambda * g if iteration_index == 0 else B * g
new_param_values = current_param_values + new_delta_param
if self.debug:
print("\nnew parameter values:")
print(str(new_param_values.T))
new_params_dict = {params_list[i] : new_param_values[i].tolist()[0][0] for i in range(num_params)}
if self.debug:
print("\nnew_params_dict: %s" % str(sorted(new_params_dict.items())))
new_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X))
for i in range(num_measurements):
new_fit_to_measurements[i,0] = eval(self._eval_functional_element(self.Fvec[i,0], new_params_dict))
if self.debug:
print("\nnew_fit_to_measurements (shown as transpose):")
print(str(new_fit_to_measurements.T))
new_error = self.X - new_fit_to_measurements
if self.debug:
print("\nnew error (shown as transpose):")
print(str(new_error.T))
new_error_norm = numpy.linalg.norm(self.X - new_fit_to_measurements)
if self.debug:
print("\nnew error norm: %s" % str(new_error_norm))
if new_error_norm >= error_norm_with_iteration[-1]:
alambda *= 10
wrong_direction_flags.append(1)
# wrong_direction_flags = wrong_direction_flags[-10:]
wrong_direction_flags = wrong_direction_flags[-20:]
# if alambda > 1e11:
if alambda > 1e9:
if self.debug:
print("\nIterations terminated because alambda exceeded limit")
break
if all(x == 1 for x in wrong_direction_flags):
if self.debug:
print("\n\nTERMINATING DESCENT BECAUSE reached a max of 20 consecutive bad steps")
break
if self.debug:
print("\nNO change in parameters for iteration_index: %d with alambda = %f" % (iteration_index, alambda))
continue
else:
if self.debug:
print("\n\n================================================ LM ITERATION: %d"
% len(productive_iteration_index_values))
print()
productive_iteration_index_values.append(iteration_index)
wrong_direction_flags.append(0)
# wrong_direction_flags = wrong_direction_flags[-10:]
wrong_direction_flags = wrong_direction_flags[-20:]
alambda = 0.001
error_norm_with_iteration.append(new_error_norm)
if self.debug:
print("\nerror norms with iterations: %s" % str(error_norm_with_iteration))
current_param_values = new_param_values
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None:
if len(productive_iteration_index_values) % 2 == 0:
self.display_function(new_fit_to_measurements, new_error_norm, len(productive_iteration_index_values)-1)
if self.debug:
print("\nerror norms with iterations: %s" % str(error_norm_with_iteration))
print("\niterations used: %d" % (len(productive_iteration_index_values) - 1))
print("\nproductive iteration index values: %s" % str(productive_iteration_index_values))
print("\n\nfinal values for the parameters: ")
print(str(new_param_values.T))
if self.debug is True and iteration_index == self.max_iterations - 1:
print("\n\nWARNING: max iterations reached without getting to the minimum")
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function:
self.display_function(new_fit_to_measurements, new_error_norm, len(productive_iteration_index_values))
result = {"error_norms_with_iterations" : error_norm_with_iteration,
"number_of_iterations" : len(productive_iteration_index_values) - 1,
"parameter_values" : new_param_values}
return result
###%%%
################ Nonlinear Least-Squares for SfM (Structure from Motion) with Bundle Adjustment #############
def bundle_adjust(self, *args, **kwargs):
"""
This is an implementation of the "bundle adjustment" version of the Levenberg-Marquardt algorithm for nonlinear
least-squares. Bundle adjustment takes advantage of the sparsity of the Jacobian that one sees in
applications such as estimating the scene structure with the images recorded with uncalibrated cameras.
The implementation shown here is based on the now celebrated paper "SBA: A Software Package for Generic
Sparse Bundle Adjustment" by Manolis Lourakis and Antonis Argyros that appeared in ACM Transactions on
Mathematical Software, March 2009.
This function is used in the following scripts
bundle_adjust_sfm_with_uncalibrated_cameras_translations_only.py
in the ExamplesStructureFromCameraMotion directory of the distribution.
Note that since bundle_adjust() is written in a generic manner, it is not called directly by the example
scripts listed above. As shown in the scripts, you need to first construct an instance of the class
ProjectiveCamera that is a co-class of the main module class NonlinearLeastSquares in the distribution.
"""
if args: raise Exception("The bundle_adjust can only be called with keyword arguments")
allowed_keys = 'num_camera_params,num_structure_elements,num_cameras,num_params_per_camera,num_measurements_per_camera,initial_val_all_params'
num_cameras=num_world_points=num_camera_params=num_structure_elements=num_cameras=num_cam_params_per_camera=num_measurements_per_camera=initial_val_all_params=None
num_camera_params = kwargs.pop('num_camera_params')
num_structure_elements = kwargs.pop('num_structure_elements') ## they remain the same for all cams
num_cameras = kwargs.pop('num_cameras')
num_cam_params_per_camera = kwargs.pop('num_cam_params_per_camera')
num_measurements_per_camera = kwargs.pop('num_measurements_per_camera')
initial_val_all_params = kwargs.pop('initial_val_all_params')
error_norm_with_iteration = []
error_norm_per_measurement_with_iteration = []
delta_for_jacobian = self.delta_for_jacobian
Fvec_as_list = self.Fvec_BA[:,0].tolist()
num_Fvec_elements = len(Fvec_as_list)
num_world_points = num_measurements_per_camera // 2
params_list = self.params_arranged_list
num_params = len(params_list)
num_measurements = len(self.X_BA)
params_for_camera_dict = {i : None for i in range(num_cameras)}
initial_param_vals_for_cam = {i : None for i in range(num_cameras)}
current_param_vals_for_cam = {i : None for i in range(num_cameras)}
for c in range(num_cameras):
params_for_camera_dict[c] = params_list[c*num_cam_params_per_camera : (c+1)*num_cam_params_per_camera]
initial_param_vals_for_cam[c] = initial_val_all_params[c*num_cam_params_per_camera : (c+1)*num_cam_params_per_camera]
structure_params = params_list[num_cameras * num_cam_params_per_camera : ]
initial_param_vals_for_structure = initial_val_all_params[ num_cameras * num_cam_params_per_camera : ]
# We now place all the initial values for the params in the current_param list for iterative processing
current_param_values = initial_val_all_params
current_param_values_vec = numpy.matrix(current_param_values).T
current_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X_BA))
for data_index in range(num_measurements_per_camera * num_cameras):
current_fit_to_measurements[data_index,0] = eval(self._eval_functional_element(self.Fvec_BA[data_index,0], self.initial_params_dict))
if self.debug:
print("\nbundle_adjust: current_fit_to_measurements (shown as transpose):")
print(str(current_fit_to_measurements.T))
current_error = self.X_BA - current_fit_to_measurements
print("\ncurrent error (this is before the iterations):")
print(current_error.flatten().tolist()[0])
current_error_norm = numpy.linalg.norm(self.X_BA - current_fit_to_measurements)
error_norm_with_iteration.append(current_error_norm)
current_error_norm_per_measurement = current_error_norm / math.sqrt(num_measurements)
print("\n\ncurrent error norm per measurement before iterations: %s" % str(current_error_norm_per_measurement))
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None and self.problem.startswith("sfm"):
predicted_pixel_coordinates = current_fit_to_measurements.flatten().tolist()[0]
predicted_pixels = [(predicted_pixel_coordinates[2*x], predicted_pixel_coordinates[2*x+1]) for x in range(len(predicted_pixel_coordinates) // 2)]
self.display_function(predicted_pixels, None, current_error_norm_per_measurement)
if current_error_norm_per_measurement < 1e-9:
print("\nCurrent error norm: %.10f" % current_error_norm)
print('''\nLooks like your initial choices for the parameters are perfect. '''
'''Perhaps there is nothing to be gained by invoking nonlinear least-squares '''
'''on your problem.''')
sys.exit(1)
error_norm_per_measurement_with_iteration.append(current_error_norm_per_measurement)
# Next we need to calculate what L&A refer to as \epsilon_ij, which is the error associated with
# the i-th point in the j_th camera. Note that the first two elements of "current_error" is for the
# first world point in the first camera. I believe that this 2-element vector would be \epsilon_11.
# The next two elements of "current_error" are for the first element in the second camera. These
# would be represented by \epsilon_12, and so on.
epsilon_array = [[None for _ in range(num_cameras)] for _ in range(num_world_points)]
current_error_as_list = current_error.flatten().tolist()[0]
epsilons_arranged_by_points = [current_error_as_list[pt*2*num_cameras : (pt+1)*2*num_cameras] for pt in range(num_world_points)]
if self.debug2:
print("\n\nepsilons_arranged_by_points: %s" % str(epsilons_arranged_by_points))
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
epsilon_array[point_index][cam_index] = numpy.matrix([epsilons_arranged_by_points[point_index][2*cam_index],
epsilons_arranged_by_points[point_index][2*cam_index+1]]).T
if self.debug2:
print("\n\nepsilon_ij array of vectors:")
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
print(epsilon_array[point_index][cam_index])
new_param_values = new_fit_to_measurements = new_error_norm = None
iteration_index = 0
## We now define for each camera two matrices that are denoted A and B in the paper by Lourakis and Argyros.
## There is an A matrix for each point and each camera, as is the case with the B matrices also. We refer
## to the array of all A matrices as the Argyros array. And we refer to the array of all B matrices as the
## Lourakis array:
Argyros_array = [[None for _ in range(num_cameras)] for _ in range(num_world_points)]
Lourakis_array = [[None for _ in range(num_cameras)] for _ in range(num_world_points)]
productive_iteration_index_values = []
best_estimated_structure = best_error_norm = None
need_fresh_jacobian_flag = True
need_sanity_check = False
self.alambda = None
self.rho = None
alambda = None
rho = None
# self.debug2 = True
self.debug2 = False
iterations_used = None
for iteration_index in range(self.max_iterations):
if need_fresh_jacobian_flag is True:
if need_sanity_check is True:
print("\n\n|||||||||||||||||||||||| entering sanity checking code ||||||||||||||||||||||||||||||||||||")
if num_cameras > 6 or num_world_points > 9:
sys.exit("It is best to run sanity check on cases involving less than six cameras and less than nine points")
jacobian = numpy.asmatrix(numpy.zeros((num_measurements, num_params), dtype=float))
params_dict_local = {params_list[i] : current_param_values[i] for i in range(num_params)}
for i in range(num_measurements):
for j in range(num_params):
param = self.params_arranged_list[j]
evaled_element1 = self._eval_functional_element(self.Fvec_BA[i,0], params_dict_local)
incremented_params_dict_local = {param : params_dict_local[param] for param in params_dict_local}
incremented_params_dict_local[param] = incremented_params_dict_local[param] + delta_for_jacobian
evaled_element2 = self._eval_functional_element(self.Fvec_BA[i,0], incremented_params_dict_local)
jacobian[i,j] = (eval(evaled_element2) - eval(evaled_element1)) / delta_for_jacobian
print("\n\nJacobian:")
print(jacobian)
print("\nsize of the jacobian: %s" % str(jacobian.shape))
sanity_A = jacobian.T * jacobian
sanity_g = jacobian.T * current_error
sanity_JtJ_max = max(sanity_A.diagonal().tolist()[0])
print("\n\nmax on the diagonal on J^T.J: %s" % str(sanity_JtJ_max))
print("|||||||||||||||||||||||| exiting sanity checking code ||||||||||||||||||||||||||||||||||||\n\n\n")
print("\n\n---------------------------Running SBA in iteration: %d---------------\n" % iteration_index)
params_dict_local = {params_list[i] : current_param_values[i] for i in range(num_params)}
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
params_for_cam = params_for_camera_dict[cam_index]
A_matrix_for_cam_and_point = numpy.asmatrix(numpy.zeros(shape=(2, len(params_for_cam))))
B_matrix_for_cam_and_point = numpy.asmatrix(numpy.zeros(shape=(2, 3))) # 2 for (x,y), 3 for (X,Y,Z)
x_cord_prediction = self.Fvec_BA[2*num_cameras*point_index + 2*cam_index,0]
y_cord_prediction = self.Fvec_BA[2*num_cameras*point_index + 2*cam_index + 1,0]
for param_index,param in enumerate(params_for_cam):
evaled_x_cord_predi = eval(self._eval_functional_element(x_cord_prediction, params_dict_local))
evaled_y_cord_predi = eval(self._eval_functional_element(y_cord_prediction, params_dict_local))
incremented_params_dict_local = {param : params_dict_local[param] for param in params_dict_local}
incremented_params_dict_local[param] += delta_for_jacobian
incremented_evaled_x_cord_predi = eval(self._eval_functional_element(x_cord_prediction,
incremented_params_dict_local))
incremented_evaled_y_cord_predi = eval(self._eval_functional_element(y_cord_prediction,
incremented_params_dict_local))
A_matrix_for_cam_and_point[0,param_index] = (incremented_evaled_x_cord_predi -
evaled_x_cord_predi) / delta_for_jacobian
A_matrix_for_cam_and_point[1,param_index] = (incremented_evaled_y_cord_predi -
evaled_y_cord_predi) / delta_for_jacobian
Argyros_array[point_index][cam_index] = A_matrix_for_cam_and_point
for cam_index in range(num_cameras):
for point_index in range(num_world_points):
B_matrix_for_cam_and_point = numpy.asmatrix(numpy.zeros(shape=(2, 3))) # 2 for (x,y), 3 for (X,Y,Z)
x_cord_prediction = self.Fvec_BA[2*num_cameras*point_index + 2*cam_index,0]
y_cord_prediction = self.Fvec_BA[2*num_cameras*point_index + 2*cam_index + 1,0]
evaled_x_cord_predi = eval(self._eval_functional_element(x_cord_prediction, params_dict_local))
evaled_y_cord_predi = eval(self._eval_functional_element(y_cord_prediction, params_dict_local))
for param_index,param in enumerate(structure_params[point_index*3:point_index*3+3]):
incremented_params_dict_local = {param : params_dict_local[param] for param in params_dict_local}
incremented_params_dict_local[param] += delta_for_jacobian
incremented_evaled_x_cord_predi = eval(self._eval_functional_element(x_cord_prediction,
incremented_params_dict_local))
incremented_evaled_y_cord_predi = eval(self._eval_functional_element(y_cord_prediction,
incremented_params_dict_local))
B_matrix_for_cam_and_point[0,param_index] = (incremented_evaled_x_cord_predi -
evaled_x_cord_predi) / delta_for_jacobian
B_matrix_for_cam_and_point[1,param_index] = (incremented_evaled_y_cord_predi -
evaled_y_cord_predi) / delta_for_jacobian
Lourakis_array[point_index][cam_index] = B_matrix_for_cam_and_point
if self.debug2:
print("\n\nShowing all A matrices (the Argyros array of matrices):")
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
print(Argyros_array[point_index][cam_index])
print("\n\nShowing all B matrices (the Lourakis array of matrices):")
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
print(Lourakis_array[point_index][cam_index])
## We now estimate the Jacobian from the A and the B matrices computed. We need to do so
## in order to initialize the value of mu in the LM algorithm:
BAjacobian = numpy.asmatrix(numpy.zeros((num_measurements, num_params), dtype=float))
row_band_size = 2*num_cameras
for i in range(num_measurements):
for j in range(num_camera_params):
row_band_index = i // (2*num_cameras)
within_rb_index = i % (2*num_cameras)
row_index_for_matrix = within_rb_index // 2
ii = i // 2
jj = j // 6
if row_index_for_matrix == jj:
m = i%2
n = j%6
BAjacobian[i,j] = Argyros_array[row_band_index][jj][m,n]
for j in range(3):
row_band_index = i // (2*num_cameras)
within_rb_index = i % (2*num_cameras)
jj = num_cameras*6 + row_band_index * 3
m = i%2
n = j%3
BAjacobian[i,jj+j] = Lourakis_array[row_band_index][within_rb_index//2][m,n]
print("\n\nBAjacobian:")
print(BAjacobian)
print("\nsize of the BAjacobian: %s" % str(BAjacobian.shape))
if need_sanity_check is True:
print("\n\n|||||||||||||||||||||||| entering sanity checking code ||||||||||||||||||||||||||||||||||||")
try:
assert numpy.array_equal(jacobian, BAjacobian), \
"the sanity check based on exact equality failed --- will try approximate for equality"
except:
for row in range(num_measurements):
for col in range(num_params):
if abs(jacobian[row,col] - BAjacobian[row,col]) > 1e-9:
sys.exit("SANITY check failed even in the approximate sense for row=%d col=%d" %(row,col))
print("\n\n|||||||||||||||||||||||| exiting sanity checking code ||||||||||||||||||||||||||||||||||||")
if iteration_index == 0:
BA_JtJ = BAjacobian.T * BAjacobian
BA_diag_max = max(BA_JtJ.diagonal().tolist()[0])
print("\n\nmax on the diagonal on J^T.J: %s" % str(BA_diag_max))
self.alambda = BA_diag_max / 1000
alambda = self.alambda
print("\n\nWe start with alambda = %f" % self.alambda)
# This will serve the same purpose as the g vector for the LM algo
g_BA = BAjacobian.T * current_error
## Now we create the U and V arrays:
U_array = [numpy.asmatrix(numpy.zeros(shape=(6,6))) for _ in range(num_cameras)]
V_array = [numpy.asmatrix(numpy.zeros(shape=(3,3))) for _ in range(num_world_points)]
for cam_index in range(num_cameras):
for point_index in range(num_world_points):
U_array[cam_index] += Argyros_array[point_index][cam_index].T * Argyros_array[point_index][cam_index]
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
V_array[point_index] += Lourakis_array[point_index][cam_index].T * Lourakis_array[point_index][cam_index]
if self.debug2:
print("\n\nUarray:")
print(U_array)
print("\n\nVarray:")
print(V_array)
W_array = [[None for _ in range(num_cameras)] for _ in range(num_world_points)]
for cam_index in range(num_cameras):
for point_index in range(num_world_points):
W_array[point_index][cam_index] = Argyros_array[point_index][cam_index].T * \
Lourakis_array[point_index][cam_index]
if self.debug2:
print("\n\nShowing all W_array:")
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
print(W_array[point_index][cam_index])
## Now we need to compute \epsilon_a_j for the j-th camera
error_cam_param = [numpy.asmatrix(numpy.zeros(shape=(6,1))) for _ in range(num_cameras)]
## and \epsilon_b_i for the i-th point
error_struct_param = [numpy.asmatrix(numpy.zeros(shape=(3,1))) for _ in range(num_world_points)]
for cam_index in range(num_cameras):
for point_index in range(num_world_points):
error_cam_param[cam_index] += Argyros_array[point_index][cam_index].T * \
epsilon_array[point_index][cam_index]
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
error_struct_param[point_index] += Lourakis_array[point_index][cam_index].T * \
epsilon_array[point_index][cam_index]
if self.debug2:
print("\n\nDisplaying error_cam_param:")
print(error_cam_param)
print("\n\nDisplaying error_struct_param:")
print(error_struct_param)
## Now we need to augment each element of the square U_array and each element of the square V_array
## by adding \mu to the diagonal: (\mu in the paper is the same as alambda here)
Ustar_array = [U_array[j].copy() for j in range(num_cameras)] # if you have 6 cam params per cam
Vstar_array = [V_array[i].copy() for i in range(num_world_points)] # for the 3 coordinates of a world p
for cam_index in range(num_cameras):
for i in range(6):
Ustar_array[cam_index][i,i] += alambda
for point_index in range(num_world_points):
for i in range(3): ## arrays
Vstar_array[point_index][i,i] += alambda
if self.debug2:
print("\n\n\nDisplaying Ustar array:")
print(Ustar_array)
print("\n\n\nDisplaying Vstar array:")
print(Vstar_array)
## Now let us calculate the Y array:
Y_array = [[None for _ in range(num_cameras)] for _ in range(num_world_points)]
for cam_index in range(num_cameras):
for point_index in range(num_world_points):
Y_array[point_index][cam_index] = W_array[point_index][cam_index] * self._pseudoinverse(Vstar_array[point_index])
if self.debug2:
print("\n\nDisplay the Y array of matrices:")
print(Y_array)
error_cam = [numpy.asmatrix(numpy.zeros(shape=(6,1))) for _ in range(num_cameras)]
for cam_index in range(num_cameras):
tempsum = numpy.asmatrix(numpy.zeros(shape=(6,1)))
for point_index in range(num_world_points):
tempsum += (Y_array[point_index][cam_index] * error_struct_param[point_index])
error_cam[cam_index] = error_cam_param[cam_index] - tempsum
S_array = [[None for _ in range(num_cameras)] for _ in range(num_cameras)]
for cam_index1 in range(num_cameras):
for cam_index2 in range(num_cameras):
tempsum2 = numpy.asmatrix(numpy.zeros(shape=(6,6)))
for point_index in range(num_world_points):
tempsum2 += Y_array[point_index][cam_index1] * W_array[point_index][cam_index2].T
if cam_index1 == cam_index2:
S_array[cam_index1][cam_index2] = Ustar_array[cam_index1] - tempsum2
else:
S_array[cam_index1][cam_index2] = - tempsum2
if self.debug2:
print("\n\nThe S matrix:")
print(S_array)
# At this point S is a mxm matrix whose every element itself is a 6x6 matrix where 6 is for the
# six camera parameters for each camera position. m is the total number of camera positions.
S = numpy.asmatrix(numpy.zeros(shape=(6*num_cameras, 6*num_cameras)))
for i in range(num_cameras):
for j in range(num_cameras):
for m in range(6):
for n in range(6):
S[i*6+m, j*6+n] = S_array[i][j][m,n]
if self.debug2:
print("\n\nThe S matrix:")
print(S)
# We now define a long vector \Delta_cam that is a column-wise concatenation of the all the
# camera specific \delta_a in the error_cam array:
error_cam_concatenated = numpy.asmatrix(numpy.zeros(shape=(6*num_cameras,1)))
for cam_index in range(num_cameras):
error_cam_concatenated[cam_index*6:(cam_index+1)*6, 0] = error_cam[cam_index]
if self.debug2:
print("\n\nerror_cam_concatenated:")
print(error_cam_concatenated)
# Suppose \delta_a represents the next step size for the camera params for each camera. It is a column
# vec with 6 elements. When we concatenate it for all m cameras, we get a 6*m element long \Delta_cam
# that has the steps to take for all the camera parameters for all m cameras:
Delta_cam = self._pseudoinverse(S) * error_cam_concatenated
if self.debug2:
print("\n\nThe calculated deltas for the 6 parameters for all camera positions:")
print(Delta_cam)
# Now break Delta_cam into camera specific portions because you are going to need them later:
Delta_cam_array = [Delta_cam[cam_index*6:(cam_index+1)*6, 0] for cam_index in range(num_cameras)]
if self.debug2:
print("\n\nDelta_cam_array to show the individual camera components in Delta_cam")
print(Delta_cam_array)
# Next we need to calculate Delta_b for all the structure points. Delta_b is a column-wise concatenation
# of world-point specific delta_b_i that we calculate in the following loop:
Delta_b = numpy.asmatrix(numpy.zeros(shape=(3*num_world_points, 1)))
for point_index in range(num_world_points):
tempsum = numpy.asmatrix(numpy.zeros(shape=(3,1)))
for cam_index in range(num_cameras):
tempsum += W_array[point_index][cam_index].T * Delta_cam_array[cam_index]
Delta_b[point_index*3:(point_index+1)*3, 0] = self._pseudoinverse(Vstar_array[point_index]) * (error_struct_param[point_index] - tempsum)
if self.debug2:
print("\n\nDelta_b column vector:")
print(Delta_b)
Delta_all = numpy.asmatrix(numpy.zeros(shape=(6*num_cameras + 3*num_world_points, 1)))
Delta_all[:6*num_cameras,0] = Delta_cam
Delta_all[6*num_cameras:,0] = Delta_b
Delta_all_as_list = Delta_all.flatten().tolist()[0]
if self.debug2:
print("\n\nDelta_all_as_list: %s" % str(Delta_all_as_list))
new_delta_param = Delta_all
if need_sanity_check is True:
left_side_eqn_9 = sanity_A + alambda * numpy.asmatrix(numpy.identity(num_params))
sanity_B = self._pseudoinverse(sanity_A + alambda * numpy.asmatrix(numpy.identity(num_params)))
sanity_new_delta_param = sanity_B * sanity_g
print("\n\nThe delta in params as produced by LM: %s" % str(sanity_new_delta_param.flatten().tolist()[0]))
new_param_values = list(map(lambda x,y:x+y, current_param_values, Delta_all_as_list))
if self.debug2:
print("\n\nnew parameter values:")
print(new_param_values)
new_params_dict = {params_list[i] : new_param_values[i] for i in range(num_params)}
if self.debug2:
print("\nnew_params_dict: %s" % str(sorted(new_params_dict.items())))
new_fit_to_measurements = numpy.asmatrix(numpy.zeros_like(self.X_BA))
for i in range(num_measurements):
new_fit_to_measurements[i,0] = eval(self._eval_functional_element(self.Fvec_BA[i,0], new_params_dict))
if self.debug2:
print("\nnew_fit_to_measurements (shown as transpose):")
print(str(new_fit_to_measurements.T))
new_error = self.X_BA - new_fit_to_measurements
if self.debug2:
print("\n\nnew_error:")
print(new_error.T)
epsilon_array = [[None for _ in range(num_cameras)] for _ in range(num_world_points)]
current_error_as_list = current_error.flatten().tolist()[0]
epsilons_arranged_by_points = [current_error_as_list[pt*2*num_cameras : (pt+1)*2*num_cameras] for pt in range(num_world_points)]
for point_index in range(num_world_points):
for cam_index in range(num_cameras):
epsilon_array[point_index][cam_index] = numpy.matrix([epsilons_arranged_by_points[point_index][2*cam_index],
epsilons_arranged_by_points[point_index][2*cam_index+1]]).T
print("\nnew error at iteration %d:" % iteration_index)
print(new_error.flatten().tolist()[0])
new_error_norm = numpy.linalg.norm(new_error)
new_error_norm_per_measurement = new_error_norm / math.sqrt(len(self.X_BA))
print("\nnew error norm per measurement: %s" % str(new_error_norm_per_measurement))
newly_projected_pixel_coordinates = new_fit_to_measurements.flatten().tolist()[0]
newly_projected_pixels = [(newly_projected_pixel_coordinates[2*x], newly_projected_pixel_coordinates[2*x+1]) for x in range(len(newly_projected_pixel_coordinates) // 2)]
rho = ( error_norm_with_iteration[-1] ** 2 - new_error_norm ** 2 ) / \
(new_delta_param.T * g_BA + alambda * new_delta_param.T * new_delta_param)
if rho <= 0.0:
need_fresh_jacobian_flag = False
alambda = alambda * max( [1.0/3.0, 1.0 - (2.0 * rho - 1.0) ** 3] ) ## BIZARRE that a matrix is returned
alambda = alambda.tolist()[0][0]
if alambda > 1e11:
print("\nIterations terminated because alambda exceeded limit")
break
print("\n\nThe current GN direction did not work out. Will try a new direction.")
if self.debug:
print("\nNO change in parameters for iteration_index: %d with alambda = %f" % (iteration_index, alambda))
Ustar_array = Vstar_array = Delta_cam = Delta_b = Delta_all = None
continue
else:
need_fresh_jacobian_flag = True
print("\n\n==================== Showing Results for SBA ITERATION: %d ==========================="
% (iteration_index + 1))
productive_iteration_index_values.append(iteration_index)
alambda = self.alambda
error_norm_per_measurement_with_iteration.append(new_error_norm_per_measurement)
error_norm_with_iteration.append(new_error_norm)
print("\n\nerror norms per measurement for all iterations: %s" % str(error_norm_per_measurement_with_iteration))
current_param_values = new_param_values
best_param_values = new_param_values
best_predicted_pixels = newly_projected_pixels
iterations_used = iteration_index + 1
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None:
if self.problem.startswith("sfm"):
num_structure_points = int(self.problem.split("_")[1])
estimated_structure = current_param_values[-3*num_structure_points:]
estimated_structure = [estimated_structure[3*i:3*i+3] for i in range(num_structure_points)]
best_estimated_structure = estimated_structure
best_error_norm_per_measurement = new_error_norm_per_measurement
self.display_function(best_predicted_pixels, estimated_structure, new_error_norm_per_measurement, len(productive_iteration_index_values)-1)
if self.debug:
print("\nerror norms with iterations: %s" % str(error_norm_per_measurement_with_iteration))
# print("\niterations used: %d" % (len(productive_iteration_index_values) - 1))
print("\niterations used: %d" % iterations_used)
print("\nproductive iteration index values: %s" % str(productive_iteration_index_values))
print("\n\nfinal values for the parameters: ")
print(str(new_param_values))
if self.debug is True and iteration_index == self.max_iterations - 1:
print("\n\nWARNING: max iterations reached without getting to the minimum")
## In the following 'if' block, note that 'self.display_function' for an instance of NonlinearLeastSquares
## is set by the method:
##
## set_constructor_options_for_optimizer(self, algo)
##
## of the ProjectiveCamera class. IN THAT CLASS, the display_function is set to the function of that class
##
## display_structure_and_pixels()
##
if self.display_function is not None:
if self.problem.startswith("sfm"):
self.display_function(best_predicted_pixels, estimated_structure, new_error_norm_per_measurement, len(productive_iteration_index_values)-1)
else:
if len(productive_iteration_index_values) % 2 == 0:
self.display_function(new_fit_to_measurements, new_error_norm, len(productive_iteration_index_values)-1)
result = {"error_norms_with_iterations" : error_norm_per_measurement_with_iteration,
# "number_of_iterations" : len(productive_iteration_index_values) - 1,
"number_of_iterations" : iterations_used,
"parameter_values" : new_param_values}
return result
###%%%
################################## Private Methods of NonlinearLeastSquares ################################
def _get_initial_params_from_file(self, filename):
if not filename.endswith('.txt'):
sys.exit("Aborted. _get_initial_params_from_file() is only for CSV files")
initial_params_dict = {}
# initial_params_list = [line for line in [line.strip() for line in open(filename,"rU")] if line is not '']
initial_params_list = [line for line in [line.strip() for line in open(filename,"rU")] if line != '']
for record in initial_params_list:
initial_params_dict[record[:record.find('=')].rstrip()] = float(record[record.find('=')+1:].lstrip())
self.params_dict = initial_params_dict
self.params_ordered_list = sorted(self.params_dict) if self.params_ordered_list is not None else self.params_arranged_list
return initial_params_dict
def _get_measured_data_from_text_file(self, filename):
if not filename.endswith('.txt'):
sys.exit("Aborted. _get_measured_data_from_text_file() is only for txt files")
all_data = list(map(float, open(filename).read().split()))
if self.debug:
print("_get_measured_data_from_text_file: all_data")
print(str(all_data))
X = numpy.matrix(all_data).T
xnorm = numpy.linalg.norm(X)
if self.debug:
print("_get_measured_data_from_text_file: norm of X: %s" % str(xnorm))
def _eval_functional_element(self, Fvec_element, params_dict):
'''
Evaluates one element of the prediction vector Fvec by substituting for its parameters
(both camera and structure) the values as supplied by the dictionary params_dict
This is the evaluation function used for the basic LM algorithm.
'''
augmented_element = Fvec_element
for param in params_dict:
regex = r'\b' + param + r'\b'
if isinstance(augmented_element, (bytes)):
if re.search(regex, augmented_element.decode('utf-8')):
augmented_element = re.sub(regex, str(params_dict[param]), augmented_element.decode('utf-8'))
else:
if re.search(regex, augmented_element):
augmented_element = re.sub(regex, str(params_dict[param]), augmented_element)
return augmented_element
def _eval_functional_element2(self, Fvec_element, param_list, param_val_list):
'''
Although this method does basically the same thing as the previous method, this one
is meant for the sparse-bundle-adjustment implementation of LM. Now the second argument,
param_val_list, is a list for the current values for the camera parameters --- BUT ONLY FOR
ONE CAMERA --- and for the structure parameters. Note that this method was written assuming
that the second argument is a list as opposed to a dict.
'''
augmented_element = Fvec_element
for i,param in enumerate(param_list):
regex = r'\b' + param + r'\b'
if isinstance(augmented_element, (bytes)):
if re.search(regex, augmented_element.decode('utf-8')):
augmented_element = re.sub(regex, str(param_val_list[i]), augmented_element.decode('utf-8'))
else:
if re.search(regex, augmented_element):
augmented_element = re.sub(regex, str(param_val_list[i]), augmented_element)
return augmented_element
def _pseudoinverse(self, A):
return (A.T * A).I * A.T