__version__ = '1.1.4'
__author__ = "Avinash Kak (kak@purdue.edu)"
__date__ = '2024-January-28'
__url__ = 'https://engineering.purdue.edu/kak/distCGP/ComputationalGraphPrimer-1.1.4.html'
__copyright__ = "(C) 2024 Avinash Kak. Python Software Foundation."
import sys,os,os.path
import numpy as np
import re
import operator
import itertools
import math
import random
import torch
from collections import deque
import copy
import matplotlib.pyplot as plt
import networkx as nx
class Exp:
'''
With CGP, you can handcraft a neural network (actually you can handcraft any DAG) by designating
the nodes and the links between them with expressions like
expressions = [ 'xx=xa^2',
'xy=ab*xx+ac*xa',
'xz=bc*xx+xy',
'xw=cd*xx+xz^3' ]
In these expressions, names beginning with 'x' denote the nodes in the DAG, and the names beginning with
lowercase letters like 'a', 'b', 'c', etc., designate the learnable parameters. The variable on the
left of the '=' symbol is considered to be the dependent_var and those on the right are, as you guessed,
the right_vars. Since the learnable parameters are always on the right of the equality sign, we refer
to them as right_params in what is shown below. The expressions shown above are parsed by the
parser function in CGP. The parser outputs an instance of the Exp class for each expression of the
sort shown above. What is shown above has 4 expressions for creating a DAG. Of course, you can have
any number of them.
'''
def __init__(self, exp, body, dependent_var, right_vars, right_params):
self.exp = exp
self.body = body
self.dependent_var = dependent_var
self.right_vars = right_vars
self.right_params = right_params
#______________________________ ComputationalGraphPrimer Class Definition ________________________________
class ComputationalGraphPrimer(object):
def __init__(self, *args, **kwargs ):
if args:
raise ValueError(
'''ComputationalGraphPrimer constructor can only be called with keyword arguments for
the following keywords: expressions, output_vars, dataset_size, grad_delta,
learning_rate, display_loss_how_often, one_neuron_model, training_iterations,
batch_size, num_layers, layers_config, epochs, for_verification_only and debug''')
expressions = output_vars = dataset_size = grad_delta = display_loss_how_often = learning_rate = None
one_neuron_model = training_iterations = batch_size = num_layers = layers_config = epochs = for_verification_only = debug = None
if 'one_neuron_model' in kwargs : one_neuron_model = kwargs.pop('one_neuron_model')
if 'batch_size' in kwargs : batch_size = kwargs.pop('batch_size')
if 'num_layers' in kwargs : num_layers = kwargs.pop('num_layers')
if 'layers_config' in kwargs : layers_config = kwargs.pop('layers_config')
if 'expressions' in kwargs : expressions = kwargs.pop('expressions')
if 'output_vars' in kwargs : output_vars = kwargs.pop('output_vars')
if 'dataset_size' in kwargs : dataset_size = kwargs.pop('dataset_size')
if 'learning_rate' in kwargs : learning_rate = kwargs.pop('learning_rate')
if 'training_iterations' in kwargs : training_iterations = kwargs.pop('training_iterations')
if 'grad_delta' in kwargs : grad_delta = kwargs.pop('grad_delta')
if 'display_loss_how_often' in kwargs : display_loss_how_often = kwargs.pop('display_loss_how_often')
if 'epochs' in kwargs : epochs = kwargs.pop('epochs')
if 'for_verification_only' in kwargs : for_verification_only = kwargs.pop('for_verification_only')
if 'debug' in kwargs : debug = kwargs.pop('debug')
if len(kwargs) != 0: raise ValueError('''You have provided unrecognizable keyword args''')
self.one_neuron_model = True if one_neuron_model is not None else False
if training_iterations:
self.training_iterations = training_iterations
self.batch_size = batch_size if batch_size else 4
self.num_layers = num_layers
self.for_verification_only = for_verification_only
if layers_config:
self.layers_config = layers_config
if expressions:
self.expressions = expressions
if output_vars:
self.output_vars = output_vars
if dataset_size:
self.dataset_size = dataset_size
if learning_rate:
self.learning_rate = learning_rate
else:
self.learning_rate = 1e-6
if grad_delta:
self.grad_delta = grad_delta
else:
self.grad_delta = 1e-4
if display_loss_how_often:
self.display_loss_how_often = display_loss_how_often
if dataset_size:
self.dataset_input_samples = {i : None for i in range(dataset_size)}
self.true_output_vals = {i : None for i in range(dataset_size)}
self.vals_for_learnable_params = None
self.epochs = epochs
if debug:
self.debug = debug
else:
self.debug = 0
self.gradient_of_loss = None
self.gradients_for_learnable_params = None
self.expressions_dict = {}
self.LOSS = [] ## loss values for all iterations of training
self.all_vars = set()
if (one_neuron_model is True) or (num_layers is None) or (for_verification_only is True):
self.independent_vars = []
self.learnable_params = []
else:
self.independent_vars = set()
self.learnable_params = set()
self.dependent_vars = {}
self.depends_on = {} ## See Introduction for the meaning of this
self.leads_to = {} ## See Introduction for the meaning of this
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
def parse_expressions(self):
'''
This method creates a DAG from a set of expressions that involve variables and learnable
parameters. The expressions are based on the assumption that a symbolic name that starts
with the letter 'x' is a variable, with all other symbolic names being learnable parameters.
The computational graph is represented by two dictionaries, 'depends_on' and 'leads_to'.
To illustrate the meaning of the dictionaries, something like "depends_on['xz']" would be
set to a list of all other variables whose outgoing arcs end in the node 'xz'. So
something like "depends_on['xz']" is best read as "node 'xz' depends on ...." where the
dots stand for the array of nodes that is the value of "depends_on['xz']". On the other
hand, the 'leads_to' dictionary has the opposite meaning. That is, something like
"leads_to['xz']" is set to the array of nodes at the ends of all the arcs that emanate
from 'xz'.
'''
self.exp_objects = []
self.var_to_int_labels = {} ## needed for DAG visualization with networkx
self.var_to_var_param = {}
node_int_label = 0 ## initialize int labels
for exp in self.expressions:
left,right = exp.split('=')
self.expressions_dict[left] = right
self.depends_on[left] = []
parts = re.findall('([a-zA-Z]+)', right)
parts_in_pairs_dict = { parts[2*i+1] : parts[2*i] for i in range(len(parts)//2) } ## for DAG visualization
self.var_to_var_param[left] = parts_in_pairs_dict
right_vars = []
right_params = []
for part in parts:
if part.startswith('x'):
self.all_vars.add(part)
self.depends_on[left].append(part)
right_vars.append(part)
self.var_to_int_labels[part] = node_int_label
node_int_label += 1
else:
if (self.one_neuron_model is True) or (self.for_verification_only is True):
self.learnable_params.append(part)
else:
self.learnable_params.add(part)
right_params.append(part)
self.var_to_int_labels[left] = node_int_label
self.all_vars.add(left)
exp_obj = Exp(exp, right, left, right_vars, right_params)
self.exp_objects.append(exp_obj)
if self.debug:
print("\n\nall variables: %s" % str(self.all_vars))
print("\n\nlearnable params: %s" % str(self.learnable_params))
print("\n\ndependencies: %s" % str(self.depends_on))
print("\n\nexpressions dict: %s" % str(self.expressions_dict))
print("\n\nvar_to_var_param dict: ", self.var_to_var_param)
print("\n\nnode to int labels: ", self.var_to_int_labels)
for var in self.all_vars:
if var not in self.depends_on: # that is, var is not a key in the depends_on dict
if (self.one_neuron_model is True) or (self.for_verification_only is True):
self.independent_vars.append(var)
else:
self.independent_vars.add(var)
self.input_size = len(self.independent_vars)
if self.debug:
print("\n\nindependent vars: %s" % str(self.independent_vars))
self.dependent_vars = [var for var in self.all_vars if var not in self.independent_vars]
self.output_size = len(self.dependent_vars)
self.leads_to = {var : set() for var in self.all_vars}
for k,v in self.depends_on.items():
for var in v:
self.leads_to[var].add(k)
if self.debug:
print("\n\nleads_to dictionary: %s" % str(self.leads_to))
def parse_multi_layer_expressions(self):
'''
This method is a modification of the previous expression parser and meant specifically
for the case when a given set of expressions are supposed to define a multi-layer neural
network. The naming conventions for the variables, which designate the nodes in the layers
of the network, and the learnable parameters remain the same as in the previous function.
'''
self.exp_objects = []
self.var_to_int_labels = {} ## needed for DAG visualization with networkx
self.var_to_var_param = {}
node_int_label = 0 ## initialize int labels
self.layer_expressions = { i : [] for i in range(1,self.num_layers) }
self.layer_exp_objects = { i : [] for i in range(1,self.num_layers) }
## A deque is a double-ended queue in which elements can inserted and deleted at both ends.
all_expressions = deque(self.expressions)
for layer_index in range(self.num_layers - 1):
for node_index in range(self.layers_config[layer_index+1]):
self.layer_expressions[layer_index+1].append( all_expressions.popleft() )
print("\n\nself.layer_expressions: ", self.layer_expressions)
self.layer_vars = {i : [] for i in range(self.num_layers)} # layer indexing starts at 0
self.layer_params = {i : [] for i in range(1,self.num_layers)} # layer indexing starts at 1
for layer_index in range(1,self.num_layers):
for exp in self.layer_expressions[layer_index]:
left,right = exp.split('=')
self.expressions_dict[left] = right
self.depends_on[left] = []
parts = re.findall('([a-zA-Z]+)', right)
parts_in_pairs_dict = { parts[2*i+1] : parts[2*i] for i in range(len(parts)//2) }
self.var_to_var_param[left] = parts_in_pairs_dict
right_vars = []
right_params = []
for part in parts:
if part.startswith('x'):
self.all_vars.add(part)
self.depends_on[left].append(part)
right_vars.append(part)
if part not in self.var_to_int_labels:
self.var_to_int_labels[part] = node_int_label
node_int_label += 1
else:
self.learnable_params.add(part)
right_params.append(part)
self.layer_vars[layer_index-1] = right_vars
self.layer_vars[layer_index].append(left)
self.layer_params[layer_index].append(right_params)
self.var_to_int_labels[left] = node_int_label
node_int_label += 1
self.all_vars.add(left)
exp_obj = Exp(exp, right, left, right_vars, right_params)
self.layer_exp_objects[layer_index].append(exp_obj)
if self.debug:
print("\n\n[layer index: %d] all variables: %s" % (layer_index, str(self.all_vars)))
print("\n\n[layer index: %d] learnable params: %s" % (layer_index, str(self.learnable_params)))
print("\n\n[layer index: %d] dependencies: %s" % (layer_index, str(self.depends_on)))
print("\n\n[layer index: %d] expressions dict: %s" % (layer_index, str(self.expressions_dict)))
print("\n\n[layer index: %d] var_to_var_param dict: %s" % (layer_index, str(self.var_to_var_param)))
print("\n\n[layer index: %d] node to int labels: %s" % (layer_index, str(self.var_to_int_labels)))
for var in self.all_vars:
if var not in self.depends_on: # that is, var is not a key in the depends_on dict
self.independent_vars.add(var)
self.input_size = len(self.independent_vars)
if self.debug:
print("\n\n[layer index: %d] independent vars: %s" % (layer_index, str(self.independent_vars)))
self.dependent_vars = [var for var in self.all_vars if var not in self.independent_vars]
self.output_size = len(self.dependent_vars)
self.leads_to = {var : set() for var in self.all_vars}
for k,v in self.depends_on.items():
for var in v:
self.leads_to[var].add(k)
self.all_params = []
for layer_index in self.layer_params:
for i in range(len(self.layer_params[layer_index])):
self.all_params += self.layer_params[layer_index][i]
if self.debug:
print("\n\n[layer index: %d] leads_to dictionary: %s" % (layer_index, str(self.leads_to)))
print("\n\n[Final] independent vars: %s" % str(self.independent_vars))
print("\n\n[Final] self.layer_vars: ", self.layer_vars)
print("\n\n[Final] self.layer_params: ", self.layer_params)
print("\n\n[Final] self.layer_exp_objects: ", self.layer_exp_objects)
def parse_general_dag_expressions(self):
'''
This method is a modification of the previous expression parser and meant specifically
for the case when a given set of expressions are supposed to define a general DAG. The
naming conventions for the variables, which designate the nodes in the layers
of the network, and the learnable parameters remain the same as in the previous function.
'''
self.exp_objects = []
self.var_to_int_labels = {} ## needed for DAG visualization with networkx
self.var_to_var_param = {}
node_int_label = 0 ## initialize int labels
for exp in self.expressions:
left,right = exp.split('=')
self.expressions_dict[left] = right
self.depends_on[left] = []
parts = re.findall('([a-zA-Z]+)', right)
parts_in_pairs_dict = { parts[2*i+1] : parts[2*i] for i in range(len(parts)//2) }
self.var_to_var_param[left] = parts_in_pairs_dict
right_vars = []
right_params = []
for part in parts:
if part.startswith('x'):
self.all_vars.add(part)
self.depends_on[left].append(part)
right_vars.append(part)
if part not in self.var_to_int_labels:
self.var_to_int_labels[part] = node_int_label
node_int_label += 1
else:
self.learnable_params.append(part)
right_params.append(part)
self.var_to_int_labels[left] = node_int_label
self.all_vars.add(left)
node_int_label += 1
self.all_vars.add(left)
exp_obj = Exp(exp, right, left, right_vars, right_params)
self.exp_objects.append(exp_obj)
if self.debug:
print("\n\nall variables: %s" % str(self.all_vars))
print("\n\nlearnable params: %s" % str(self.learnable_params))
print("\n\ndependencies: %s" % str(self.depends_on))
print("\n\nexpressions dict: %s" % str(self.expressions_dict))
print("\n\nvar_to_var_param dict: ", self.var_to_var_param)
print("\n\nnode to int labels: ", self.var_to_int_labels)
for var in self.all_vars:
if var not in self.depends_on: # that is, var is not a key in the depends_on dict
self.independent_vars.append(var)
self.input_size = len(self.independent_vars)
if self.debug:
print("\n\nindependent vars: %s" % str(self.independent_vars))
self.dependent_vars = [var for var in self.all_vars if var not in self.independent_vars]
self.output_size = len(self.dependent_vars)
self.leads_to = {var : set() for var in self.all_vars}
for k,v in self.depends_on.items():
for var in v:
self.leads_to[var].add(k)
if self.debug:
print("\n\nleads_to dictionary: %s" % str(self.leads_to))
def display_DAG(self):
"""
The network visualization code in this script should work for any general DAG defined in
an instance of CGP. For an example, see the script
graph_based_dataflow.py
in the Examples directory of the module.
"""
node_vars = range(5)
labels = {}
for var in self.all_vars:
labels[ self.var_to_int_labels[var] ] = r"$%s$" % var
edges = []
for source_var in self.leads_to:
dest_vars = self.leads_to[source_var]
for dest_var in dest_vars:
edges.append( (self.var_to_int_labels[source_var], self.var_to_int_labels[ dest_var ] ) )
edge_labels = {}
output_vars = list(self.var_to_var_param.keys())
for out_var in output_vars:
out_node = self.var_to_int_labels[out_var]
var_param_pairs = self.var_to_var_param[out_var]
for i in range(len(var_param_pairs)):
source_var = list(var_param_pairs.keys())[i]
edge_labels[ ( self.var_to_int_labels[source_var], out_node ) ] = r"$%s$" % var_param_pairs[source_var]
G = nx.DiGraph()
G.add_nodes_from(node_vars)
G.add_edges_from(edges)
pos = nx.circular_layout(G)
plt.figure(figsize=(8, 8))
options = {"edgecolors" : "tab:gray", "node_size" : 2000, "node_color" : "yellow", "arrowsize" : 25, "alpha" : 0.9}
nx.draw(G, pos, with_labels=False, **options)
nx.draw_networkx_labels(G, pos, labels, font_size=22, font_color="black")
nx.draw_networkx_edge_labels(G, pos, edge_labels, font_size=12, font_color="black", label_pos=0.75)
plt.axis("equal")
plt.show()
def display_one_neuron_network(self):
"""
In version 1.1.0, I generalized this code to work on any one-neuron network as defined in the
example:
one_neuron_classifier.py
in the Examples directory of the module.
"""
subset_sizes = [4, 1]
subset_color = [
"gold",
"violet",
"violet",
"violet",
"violet",
"limegreen",
"limegreen",
"darkorange",
]
output_vars = self.dependent_vars
input_vars = self.independent_vars
num_output_vars = len(output_vars)
num_input_vars = len(input_vars)
if num_output_vars > 1:
sys.exit("\n\nAborted. For the one-neuron case, you are only allowed one output var. \
You have %d output vars" % num_output_vars)
## Declare the node labels using the index values mentioned above
labels = {}
for var in self.all_vars:
labels[ self.var_to_int_labels[var] ] = r"$%s$" % var
## Declare the edge labels --- again using the node index values mentioned previously
edge_labels = {}
out_node = self.var_to_int_labels[ output_vars[0] ]
var_param_pairs = self.var_to_var_param[output_vars[0] ]
for i in range(len(var_param_pairs)):
source_var = list(var_param_pairs.keys())[i]
edge_labels[ ( self.var_to_int_labels[source_var], out_node ) ] = r"$%s$" % var_param_pairs[source_var]
def multilayered_graph(*subset_sizes):
'''
Defines a multi-partite graph you need for a neural network
'''
extents = nx.utils.pairwise(itertools.accumulate((0,) + subset_sizes))
layers = [range(start, end) for start, end in extents]
G = nx.DiGraph()
for (i, layer) in enumerate(layers):
G.add_nodes_from(layer, layer=i)
for layer1, layer2 in nx.utils.pairwise(layers):
G.add_edges_from(itertools.product(layer1, layer2))
return G
G = multilayered_graph(*subset_sizes)
color = [subset_color[data["layer"]] for v, data in G.nodes(data=True)]
pos = nx.multipartite_layout(G, subset_key="layer")
plt.figure(figsize=(8, 8))
options = {"edgecolors" : "tab:gray", "node_size" : 2000, "arrowsize" : 25, "alpha" : 0.9}
nx.draw(G, pos, node_color=color, with_labels=False, **options)
nx.draw_networkx_labels(G, pos, labels, font_size=22, font_color="black")
nx.draw_networkx_edge_labels(G, pos, edge_labels, font_size=12, font_color="black", label_pos=0.75)
plt.axis("equal")
plt.show()
def display_multi_neuron_network(self):
"""
In version 1.1.0, I made this network visualization more general and (if it has no bugs) it should
work with any multi-layer network graph, such as the one shown in
multi_neuron_classifier.py
in the Examples directory of the module.
"""
## This tells us how many nodes in each layer and what the integer indexing for the nodes should be
subset_sizes = self.layers_config
subset_color = [
"gold",
"violet",
"violet",
"violet",
"violet",
"limegreen",
"limegreen",
"darkorange",
]
## Declare the node labels using the integer index values for the nodes
labels = {}
for var in self.all_vars:
labels[ self.var_to_int_labels[var] ] = r"$%s$" % var
## Declare the edge labels using the integer index values for the nodes:
edge_labels = {}
output_vars = list(self.var_to_var_param.keys())
for out_var in output_vars:
out_node = self.var_to_int_labels[out_var]
var_param_pairs = self.var_to_var_param[out_var]
for i in range(len(var_param_pairs)):
source_var = list(var_param_pairs.keys())[i]
edge_labels[ ( self.var_to_int_labels[source_var], out_node ) ] = r"$%s$" % var_param_pairs[source_var]
def multilayered_graph(*subset_sizes):
'''
Defines a multi-partite graph you need for a neural network
'''
extents = nx.utils.pairwise(itertools.accumulate((0,) + subset_sizes))
layers = [range(start, end) for start, end in extents]
G = nx.DiGraph()
for (i, layer) in enumerate(layers):
G.add_nodes_from(layer, layer=i)
for layer1, layer2 in nx.utils.pairwise(layers):
G.add_edges_from(itertools.product(layer1, layer2))
return G
G = multilayered_graph(*subset_sizes)
color = [subset_color[data["layer"]] for v, data in G.nodes(data=True)]
pos = nx.multipartite_layout(G, subset_key="layer")
## Declare a 'figure' object you need for drawing the multi-partite graph and set its size
plt.figure(figsize=(8, 8))
options = {"edgecolors" : "tab:gray", "node_size" : 2000, "arrowsize" : 25, "alpha" : 0.9}
nx.draw(G, pos, node_color=color, with_labels=False, **options)
nx.draw_networkx_labels(G, pos, labels, font_size=22, font_color="black")
nx.draw_networkx_edge_labels(G, pos, edge_labels, font_size=12, font_color="black", label_pos=0.75)
plt.axis("equal")
plt.show()
def display_network1(self):
G = nx.DiGraph()
G.add_nodes_from(self.all_vars)
edges = []
for ver1 in self.leads_to:
for ver2 in self.leads_to[ver1]:
edges.append( (ver1,ver2) )
G.add_edges_from( edges )
nx.draw(G, with_labels=True, font_weight='bold')
plt.show()
def display_network2(self):
'''
Provides a fancier display of the network graph
'''
G = nx.DiGraph()
G.add_nodes_from(self.all_vars)
edges = []
for ver1 in self.leads_to:
for ver2 in self.leads_to[ver1]:
edges.append( (ver1,ver2) )
G.add_edges_from( edges )
pos = nx.circular_layout(G)
nx.draw(G, pos, with_labels = True, edge_color='b', node_color='lightgray',
arrowsize=20, arrowstyle='fancy', node_size=1200, font_size=20,
font_color='black')
plt.title("Computational graph for the expressions")
plt.show()
######################################################################################################
######################################### one neuron model ###########################################
def run_training_loop_one_neuron_model(self, training_data):
"""
The training loop must first initialize the learnable parameters. Remember, these are the
symbolic names in your input expressions for the neural layer that do not begin with the
letter 'x'. In this case, we are initializing with random numbers from a uniform distribution
over the interval (0,1).
"""
self.vals_for_learnable_params = {param: random.uniform(0,1) for param in self.learnable_params}
self.bias = random.uniform(0,1) ## Adding the bias improves class discrimination.
## We initialize it to a random number.
class DataLoader:
"""
To understand the logic of the dataloader, it would help if you first understand how
the training dataset is created. Search for the following function in this file:
gen_training_data(self)
As you will see in the implementation code for this method, the training dataset
consists of a Python dict with two keys, 0 and 1, the former points to a list of
all Class 0 samples and the latter to a list of all Class 1 samples. In each list,
the data samples are drawn from a multi-dimensional Gaussian distribution. The two
classes have different means and variances. The dimensionality of each data sample
is set by the number of nodes in the input layer of the neural network.
The data loader's job is to construct a batch of samples drawn randomly from the two
lists mentioned above. And it mush also associate the class label with each sample
separately.
"""
def __init__(self, training_data, batch_size):
self.training_data = training_data
self.batch_size = batch_size
self.class_0_samples = [(item, 0) for item in self.training_data[0]] ## Associate label 0 with each sample
self.class_1_samples = [(item, 1) for item in self.training_data[1]] ## Associate label 1 with each sample
def __len__(self):
return len(self.training_data[0]) + len(self.training_data[1])
def _getitem(self):
cointoss = random.choice([0,1]) ## When a batch is created by getbatch(), we want the
## samples to be chosen randomly from the two lists
if cointoss == 0:
return random.choice(self.class_0_samples)
else:
return random.choice(self.class_1_samples)
def getbatch(self):
batch_data,batch_labels = [],[] ## First list for samples, the second for labels
maxval = 0.0 ## For approximate batch data normalization
for _ in range(self.batch_size):
item = self._getitem()
if np.max(item[0]) > maxval:
maxval = np.max(item[0])
batch_data.append(item[0])
batch_labels.append(item[1])
batch_data = [item/maxval for item in batch_data] ## Normalize batch data
batch = [batch_data, batch_labels]
return batch
data_loader = DataLoader(training_data, batch_size=self.batch_size)
loss_running_record = []
i = 0
avg_loss_over_iterations = 0.0 ## Average the loss over iterations for printing out
## every N iterations during the training loop.
for i in range(self.training_iterations):
data = data_loader.getbatch()
data_tuples_in_batch = data[0]
class_labels_in_batch = data[1]
y_preds, deriv_sigmoids = self.forward_prop_one_neuron_model(data_tuples_in_batch) ## FORWARD PROP of data
loss = sum([(abs(class_labels_in_batch[i] - y_preds[i]))**2 for i in range(len(class_labels_in_batch))]) ## Find loss
avg_loss_over_iterations += loss / float(len(class_labels_in_batch))
if i%(self.display_loss_how_often) == 0:
avg_loss_over_iterations /= self.display_loss_how_often
loss_running_record.append(avg_loss_over_iterations)
print("[iter=%d] loss = %.4f" % (i+1, avg_loss_over_iterations)) ## Display average loss
avg_loss_over_iterations = 0.0 ## Re-initialize avg loss
y_errors_in_batch = list(map(operator.sub, class_labels_in_batch, y_preds))
self.backprop_and_update_params_one_neuron_model(data_tuples_in_batch, y_preds, y_errors_in_batch, deriv_sigmoids) ## BACKPROP loss
plt.figure()
plt.plot(loss_running_record)
plt.show()
def forward_prop_one_neuron_model(self, data_tuples_in_batch):
"""
Forward propagates the batch data through the neural network according to the equations on
Slide 50 of my Week 3 slides.
As the one-neuron model is characterized by a single expression, the main job of this function is
to evaluate that expression for each data tuple in the incoming batch. The resulting output is
fed into the sigmoid activation function and the partial derivative of the sigmoid with respect
to its input calculated.
"""
output_vals = []
deriv_sigmoids = []
for vals_for_input_vars in data_tuples_in_batch:
input_vars = self.independent_vars ## This is a list of vars for the input nodes. For the
## the One-Neuron example in the Examples directory
## this is just the list [xa, xb, xc, xd]
vals_for_input_vars_dict = dict(zip(input_vars, list(vals_for_input_vars))) ## The current values at input
exp_obj = self.exp_objects[0] ## To understand this, first see the definition of the
## Exp class (search for the string "class Exp").
## Each expression that defines the neural network is
## represented by one Exp instance by the parser.
output_val = self.eval_expression(exp_obj.body , vals_for_input_vars_dict, self.vals_for_learnable_params)
output_val = output_val + self.bias
output_val = 1.0 / (1.0 + np.exp(-1.0 * output_val)) ## Apply sigmoid activation (output confined to [0.0,1.0] interval)
deriv_sigmoid = output_val * (1.0 - output_val) ## See Slide 59 for why we need partial deriv of Sigmoid at input point
output_vals.append(output_val) ## Collect output values for different input samples in batch
deriv_sigmoids.append(deriv_sigmoid) ## Collect the Sigmoid derivatives for each input sample in batch
## The derivatives that are saved during forward prop are shown on Slide 59.
return output_vals, deriv_sigmoids
def backprop_and_update_params_one_neuron_model(self, data_tuples_in_batch, predictions, y_errors_in_batch, deriv_sigmoids):
"""
This function implements the equations shown on Slide 61 of my Week 3 presentation in our DL
class at Purdue. All four parameters defined above are lists of what was either supplied to the
forward prop function or calculated by it for each training data sample in a batch.
"""
input_vars = self.independent_vars
input_vars_to_param_map = self.var_to_var_param[self.output_vars[0]] ## These two statements align the
param_to_vars_map = {param : var for var, param in input_vars_to_param_map.items()} ## the input vars
vals_for_learnable_params = self.vals_for_learnable_params
for i,param in enumerate(self.vals_for_learnable_params):
## For each param, sum the partials from every training data sample in batch
partial_of_loss_wrt_param = 0.0
for j in range(self.batch_size):
vals_for_input_vars_dict = dict(zip(input_vars, list(data_tuples_in_batch[j])))
partial_of_loss_wrt_param += - y_errors_in_batch[j] * vals_for_input_vars_dict[param_to_vars_map[param]] * deriv_sigmoids[j]
partial_of_loss_wrt_param /= float(self.batch_size)
step = self.learning_rate * partial_of_loss_wrt_param
## Update the learnable parameters
self.vals_for_learnable_params[param] += step
y_error_avg = sum(y_errors_in_batch) / float(self.batch_size)
deriv_sigmoid_avg = sum(deriv_sigmoids) / float(self.batch_size)
self.bias += self.learning_rate * y_error_avg * deriv_sigmoid_avg ## Update the bias
######################################################################################################
######################################################################################################
######################################## multi neuron model ##########################################
def run_training_loop_multi_neuron_model(self, training_data):
class DataLoader:
"""
To understand the logic of the dataloader, it would help if you first understand how
the training dataset is created. Search for the following function in this file:
gen_training_data(self)
As you will see in the implementation code for this method, the training dataset
consists of a Python dict with two keys, 0 and 1, the former points to a list of
all Class 0 samples and the latter to a list of all Class 1 samples. In each list,
the data samples are drawn from a multi-dimensional Gaussian distribution. The two
classes have different means and variances. The dimensionality of each data sample
is set by the number of nodes in the input layer of the neural network.
The data loader's job is to construct a batch of samples drawn randomly from the two
lists mentioned above. And it mush also associate the class label with each sample
separately.
"""
def __init__(self, training_data, batch_size):
self.training_data = training_data
self.batch_size = batch_size
self.class_0_samples = [(item, 0) for item in self.training_data[0]] ## Associate label 0 with each sample
self.class_1_samples = [(item, 1) for item in self.training_data[1]] ## Associate label 1 with each sample
def __len__(self):
return len(self.training_data[0]) + len(self.training_data[1])
def _getitem(self):
cointoss = random.choice([0,1]) ## When a batch is created by getbatch(), we want the
## samples to be chosen randomly from the two lists
if cointoss == 0:
return random.choice(self.class_0_samples)
else:
return random.choice(self.class_1_samples)
def getbatch(self):
batch_data,batch_labels = [],[] ## First list for samples, the second for labels
maxval = 0.0 ## For approximate batch data normalization
for _ in range(self.batch_size):
item = self._getitem()
if np.max(item[0]) > maxval:
maxval = np.max(item[0])
batch_data.append(item[0])
batch_labels.append(item[1])
batch_data = [item/maxval for item in batch_data] ## Normalize batch data
batch = [batch_data, batch_labels]
return batch
## The training loop must first initialize the learnable parameters. Remember, these are the
## symbolic names in your input expressions for the neural layer that do not begin with the
## letter 'x'. In this case, we are initializing with random numbers from a uniform distribution
## over the interval (0,1):
self.vals_for_learnable_params = {param: random.uniform(0,1) for param in self.learnable_params}
## In the same manner, we must also initialize the biases at each node that aggregates forward
## propagating data:
self.bias = {i : [random.uniform(0,1) for j in range( self.layers_config[i] ) ] for i in range(1, self.num_layers)}
data_loader = DataLoader(training_data, batch_size=self.batch_size)
loss_running_record = []
i = 0
avg_loss_over_iterations = 0.0 ## Average the loss over iterations for printing out
## every N iterations during the training loop.
for i in range(self.training_iterations):
data = data_loader.getbatch()
data_tuples = data[0]
class_labels = data[1]
self.forward_prop_multi_neuron_model(data_tuples) ## FORW PROP works by side-effect
predicted_labels_for_batch = self.forw_prop_vals_at_layers[self.num_layers-1] ## Predictions from FORW PROP
y_preds = [item for sublist in predicted_labels_for_batch for item in sublist] ## Get numeric vals for predictions
loss = sum([(abs(class_labels[i] - y_preds[i]))**2 for i in range(len(class_labels))]) ## Calculate loss for batch
loss_avg = loss / float(len(class_labels)) ## Average the loss over batch
avg_loss_over_iterations += loss_avg ## Add to Average loss over iterations
if i%(self.display_loss_how_often) == 0:
avg_loss_over_iterations /= self.display_loss_how_often
loss_running_record.append(avg_loss_over_iterations)
print("[iter=%d] loss = %.4f" % (i+1, avg_loss_over_iterations)) ## Display avg loss
avg_loss_over_iterations = 0.0 ## Re-initialize avg-over-iterations loss
y_errors_in_batch = list(map(operator.sub, class_labels, y_preds))
self.backprop_and_update_params_multi_neuron_model(y_preds, y_errors_in_batch)
plt.figure()
plt.plot(loss_running_record)
plt.show()
def forward_prop_multi_neuron_model(self, data_tuples_in_batch):
"""
During forward propagation, we push each batch of the input data through the
network. In order to explain the logic of forward, consider the following network
layout in 4 nodes in the input layer, 2 nodes in the hidden layer, and 1 node in
the output layer.
input
x x = node
x x| | = sigmoid activation
x|
x x|
x
layer_0 layer_1 layer_2
In the code shown below, the expressions to evaluate for computing the
pre-activation values at a node are stored at the layer in which the nodes reside.
That is, the dictionary look-up "self.layer_exp_objects[layer_index]" returns the
Expression objects for which the left-side dependent variable is in the layer
pointed to layer_index. So the example shown above, "self.layer_exp_objects[1]"
will return two Expression objects, one for each of the two nodes in the second
layer of the network (that is, layer indexed 1).
The pre-activation values obtained by evaluating the expressions at each node are
then subject to Sigmoid activation, followed by the calculation of the partial
derivative of the output of the Sigmoid function with respect to its input.
In the forward, the values calculated for the nodes in each layer are stored in
the dictionary
self.forw_prop_vals_at_layers[ layer_index ]
and the gradients values calculated at the same nodes in the dictionary:
self.gradient_vals_for_layers[ layer_index ]
"""
self.forw_prop_vals_at_layers = {i : [] for i in range(self.num_layers)} ## stores the forw propagated vals in all layers
self.gradient_vals_for_layers = {i : [] for i in range(1, self.num_layers)} ## stores sigmoid gradients for all layers except first
for vals_for_input_vars in data_tuples_in_batch:
self.forw_prop_vals_at_layers[0].append(vals_for_input_vars)
for layer_index in range(1, self.num_layers):
input_vars = self.layer_vars[layer_index-1]
if layer_index == 1:
vals_for_input_vars_dict = dict(zip(input_vars, list(vals_for_input_vars)))
output_vals_arr = [] ## This stores the outputs in the current layer
gradients_val_arr = [] ## This stores the gradients of Sigmoid in the current layer
## In the following loop for forward propagation calculations, exp_obj is the Exp object that
## is created for each user-supplied expression that specifies the network. See the definition
## of the class Exp (for 'Expression') by searching for "class Exp". It is also shown on Slide 65.
for k,exp_obj in enumerate(self.layer_exp_objects[layer_index]):
output_val = self.eval_expression(exp_obj.body , vals_for_input_vars_dict, self.vals_for_learnable_params, input_vars)
output_val = output_val + self.bias[layer_index][k]
## Apply sigmoid activation:
output_val = 1.0 / (1.0 + np.exp(-1.0 * output_val))
output_vals_arr.append(output_val)
## Calculate the partial of the activation function Sigmoid as a function of its input:
deriv_sigmoid = output_val * (1.0 - output_val)
gradients_val_arr.append(deriv_sigmoid)
vals_for_input_vars_dict[ exp_obj.dependent_var ] = output_val
## You have one "output_vals_arr" for each training data sample in batch:
self.forw_prop_vals_at_layers[layer_index].append(output_vals_arr)
## See the bullets in red on Slides 70 and 72 of my Week 3 slides for what needs
## to be stored during the forward propagation of data through the network.
## IMPORTANT: You have one "gradients_val_arr" for each training data sample in batch:
self.gradient_vals_for_layers[layer_index].append(gradients_val_arr)
def backprop_and_update_params_multi_neuron_model(self, predictions, y_errors):
"""
First note that loop index variable 'back_layer_index' starts with the index of
the last layer. For the 3-layer example shown for 'forward', back_layer_index
starts with a value of 2, its next value is 1, and that's it.
In the code below, the outermost loop is over the data samples in a batch. As shown
on Slide 73 of my Week 3 lecture, in order to calculate the partials of Loss with
respect to the learnable params, we need to backprop the prediction errors and
the gradients of the Sigmoid. For the purpose of satisfying the requirements of
SGD, the backprop of the prediction errors and the gradients needs to be carried
out separately for each training data sample in a batch. That's what the outer
loop is for.
After we exit the outermost loop, we average over the results obtained from each
training data sample in a batch.
Pay attention to the variable 'vars_in_layer'. These store the node variables in
the current layer during backpropagation.
"""
## Eq. (24) on Slide 73 of my Week 3 lecture says we need to store backproped errors in each layer leading up to the last:
pred_err_backproped_at_layers = [ {i : [None for j in range( self.layers_config[i] ) ]
for i in range(self.num_layers)} for _ in range(self.batch_size) ]
## This will store "\delta L / \delta w" you see at the LHS of the equations on Slide 73:
partial_of_loss_wrt_params = {param : 0.0 for param in self.all_params}
## For estimating the changes to the bias to be made on the basis of the derivatives of the Sigmoids:
bias_changes = {i : [0.0 for j in range( self.layers_config[i] ) ] for i in range(1, self.num_layers)}
for b in range(self.batch_size):
pred_err_backproped_at_layers[b][self.num_layers - 1] = [ y_errors[b] ]
for back_layer_index in reversed(range(1,self.num_layers)): ## For the 3-layer network, the first val for back_layer_index is 2 for the 3rd layer
input_vals = self.forw_prop_vals_at_layers[back_layer_index -1] ## This is a list of 8 two-element lists --- since we have two nodes in the 2nd layer
deriv_sigmoids = self.gradient_vals_for_layers[back_layer_index] ## This is a list eight one-element lists, one for each batch element
vars_in_layer = self.layer_vars[back_layer_index] ## A list like ['xo']
vars_in_next_layer_back = self.layer_vars[back_layer_index - 1] ## A list like ['xw', 'xz']
vals_for_input_vars_dict = dict(zip(vars_in_next_layer_back, self.forw_prop_vals_at_layers[back_layer_index - 1][b]))
## For the next statement, note that layer_params are stored in a dict like
## {1: [['ap', 'aq', 'ar', 'as'], ['bp', 'bq', 'br', 'bs']], 2: [['cp', 'cq']]}
## "layer_params[idx]" is a list of lists for the link weights in layer whose output nodes are in layer "idx"
layer_params = self.layer_params[back_layer_index]
transposed_layer_params = list(zip(*layer_params)) ## Creating a transpose of the link matrix, See Eq. 30 on Slide 77
for k,var1 in enumerate(vars_in_next_layer_back):
for j,var2 in enumerate(vars_in_layer):
pred_err_backproped_at_layers[b][back_layer_index - 1][k] = sum([self.vals_for_learnable_params[transposed_layer_params[k][i]]
* pred_err_backproped_at_layers[b][back_layer_index][i]
for i in range(len(vars_in_layer))])
for j,var in enumerate(vars_in_layer):
layer_params = self.layer_params[back_layer_index][j] ## ['cp', 'cq'] for the end layer
input_vars_to_param_map = self.var_to_var_param[var] ## These two statements align the {'xw': 'cp', 'xz': 'cq'}
param_to_vars_map = {param : var for var, param in input_vars_to_param_map.items()} ## and the input vars {'cp': 'xw', 'cq': 'xz'}
## Update the partials of Loss wrt to the learnable parameters between the current layer
## and the previous layer. You are accumulating these partials over the different training
## data samples in the batch being processed. For each training data sample, the formula
## being used is shown in Eq. (29) on Slide 77 of my Week 3 slides:
for i,param in enumerate(layer_params):
partial_of_loss_wrt_params[param] += pred_err_backproped_at_layers[b][back_layer_index][j] * \
vals_for_input_vars_dict[param_to_vars_map[param]] * deriv_sigmoids[b][j]
## We will now estimate the change in the bias that needs to be made at each node in the previous layer
## from the derivatives the sigmoid at the nodes in the current layer and the prediction error as
## backproped to the previous layer nodes:
for k,var1 in enumerate(vars_in_next_layer_back):
for j,var2 in enumerate(vars_in_layer):
if back_layer_index-1 > 0:
bias_changes[back_layer_index-1][k] += pred_err_backproped_at_layers[b][back_layer_index - 1][k] * deriv_sigmoids[b][j]
## Now update the learnable parameters. The loop shown below carries out SGD mandated averaging
for param in partial_of_loss_wrt_params:
partial_of_loss_wrt_param = partial_of_loss_wrt_params[param] / float(self.batch_size)
step = self.learning_rate * partial_of_loss_wrt_param
self.vals_for_learnable_params[param] += step
## Finally we update the biases at all the nodes that aggregate data:
for layer_index in range(1,self.num_layers):
for k in range(self.layers_config[layer_index]):
self.bias[layer_index][k] += self.learning_rate * ( bias_changes[layer_index][k] / float(self.batch_size) )
######################################################################################################
######################################################################################################
############################# torch.nn based experiments for verification ############################
def run_training_with_torchnn(self, option, training_data):
"""
The value of the parameter 'option' must be either 'one_neuron' or 'multi_neuron'.
For either option, the number of input nodes is specified by the expressions specified in the
constructor of the class ComputationalGraphPrimer.
When the option value is 'one_neuron', we use the OneNeuronNet for the learning network and
when the option is 'multi_neuron' we use the MultiNeuronNet.
Assuming that the number of input nodes specified by the expressions is 4, the MultiNeuronNet
class creates the following network layout in which we have 2 nodes in the hidden layer and
one node for the final output:
input
x x = node
x x| | = ReLU activation
x|
x x|
x
layer_0 layer_1 layer_2
"""
class DataLoader:
"""
To understand the logic of the dataloader, it would help if you first understand how
the training dataset is created. Search for the following function in this file:
gen_training_data(self)
As you will see in the implementation code for this method, the training dataset
consists of a Python dict with two keys, 0 and 1, the former points to a list of
all Class 0 samples and the latter to a list of all Class 1 samples. In each list,
the data samples are drawn from a multi-dimensional Gaussian distribution. The two
classes have different means and variances. The dimensionality of each data sample
is set by the number of nodes in the input layer of the neural network.
The data loader's job is to construct a batch of samples drawn randomly from the two
lists mentioned above. And it mush also associate the class label with each sample
separately.
"""
def __init__(self, training_data, batch_size):
self.training_data = training_data
self.batch_size = batch_size
self.class_0_samples = [(item, 0) for item in self.training_data[0]] ## Associate label 0 with each sample
self.class_1_samples = [(item, 1) for item in self.training_data[1]] ## Associate label 1 with each sample
def __len__(self):
return len(self.training_data[0]) + len(self.training_data[1])
def _getitem(self):
cointoss = random.choice([0,1]) ## When a batch is created by getbatch(), we want the
## samples to be chosen randomly from the two lists
if cointoss == 0:
return random.choice(self.class_0_samples)
else:
return random.choice(self.class_1_samples)
def getbatch(self):
batch_data,batch_labels = [],[] ## First list for samples, the second for labels
maxval = 0.0 ## For approximate batch data normalization
for _ in range(self.batch_size):
item = self._getitem()
if np.max(item[0]) > maxval:
maxval = np.max(item[0])
batch_data.append(item[0])
batch_labels.append(item[1])
batch_data = [item/maxval for item in batch_data] ## Normalize batch data
batch = [batch_data, batch_labels]
return batch
data_loader = DataLoader(training_data, batch_size=self.batch_size)
class OneNeuronNet(torch.nn.Module):
"""
This class is used when the parameter 'option' is set to 'one_neuron' in the call to
this training function.
"""
def __init__(self, D_in, D_out):
torch.nn.Module.__init__( self )
self.linear = torch.nn.Linear(D_in, D_out)
self.sigmoid = torch.nn.Sigmoid()
def forward(self, x):
h_out = self.linear(x)
y_pred = self.sigmoid(h_out)
return y_pred
class MultiNeuronNet(torch.nn.Module):
"""
This class is used when the parameter 'option' is set to 'multi_neuron' in the call to
this training function.
"""
def __init__(self, D_in, H, D_out):
torch.nn.Module.__init__( self )
self.linear1 = torch.nn.Linear(D_in, H)
self.linear2 = torch.nn.Linear(H, D_out)
def forward(self, x):
h_relu = self.linear1(x).clamp(min=0)
y_pred = self.linear2(h_relu)
return y_pred
loss_running_record = []
i = 0
avg_loss_over_iterations = 0.0
if option == 'one_neuron':
N,D_in,D_out = self.batch_size,self.input_size,self.output_size
model = OneNeuronNet(D_in,D_out)
elif option == 'multi_neuron':
N,D_in,H,D_out = self.batch_size,self.input_size,2,self.output_size
model = MultiNeuronNet(D_in,H,D_out)
else:
sys.exit("\n\nThe value of the parameter 'option' not recognized\n\n")
criterion = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.SGD(model.parameters(), self.learning_rate)
for i in range(self.training_iterations):
data = data_loader.getbatch()
data_tuples = torch.FloatTensor( data[0] )
class_labels = torch.FloatTensor( data[1] )
# We need to convert the shape torch.Size([8]) into the shape torch.Size([8, 1]):
class_labels = torch.unsqueeze(class_labels, 1)
y_preds = model(data_tuples)
loss = criterion(y_preds, class_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
avg_loss_over_iterations += loss.item()
if i%(self.display_loss_how_often) == 0:
avg_loss_over_iterations /= self.display_loss_how_often
loss_running_record.append(avg_loss_over_iterations)
print("[iter=%d] loss = %.4f" % (i+1, avg_loss_over_iterations))
avg_loss_over_iterations = 0.0
plt.figure()
plt.plot(loss_running_record)
plt.show()
######################################################################################################
######################################################################################################
##################### experimenting with dataflows in a general graph of nodes #####################
def train_on_all_data(self):
'''
The purpose of this method is to call forward_propagate_one_input_sample_with_partial_deriv_calc()
repeatedly on all input/output ground-truth training data pairs generated by the method
gen_gt_dataset(). The call to the forward_propagate...() method returns the predicted value
at the output nodes from the supplied values at the input nodes. The "train_on_all_data()"
method calculates the error associated with the predicted value. The call to
forward_propagate...() also returns the partial derivatives estimated by using the finite
difference method in the computational graph. Using the partial derivatives, the
"train_on_all_data()" backpropagates the loss to the interior nodes in the computational graph
and updates the values for the learnable parameters.
'''
self.vals_for_learnable_params = {var: random.uniform(0,1) for var in self.learnable_params}
print("\n\nInitial values for all learnable parameters: %s" % str(self.vals_for_learnable_params))
for sample_index in range(self.dataset_size):
if (sample_index > 0) and (sample_index % self.display_loss_how_often == 0):
print("\n\n============ [Forward Propagation] Training with sample indexed: %d ===============" % sample_index)
input_vals_for_ind_vars = {var: self.dataset_input_samples[sample_index][var] for var in self.independent_vars}
if (sample_index > 0) and (sample_index % self.display_loss_how_often == 0):
print("\ninput values for independent variables: ", input_vals_for_ind_vars)
predicted_output_vals, partial_var_to_param, partial_var_to_var = \
self.forward_propagate_one_input_sample_with_partial_deriv_calc(sample_index, input_vals_for_ind_vars)
error = [self.true_output_vals[sample_index][var] - predicted_output_vals[var] for var in self.output_vars]
loss = np.linalg.norm(error)
if self.debug:
print("\nloss for training sample indexed %d: %s" % (sample_index, str(loss)))
self.LOSS.append(loss)
## The following code block prints out the loss and the gradients every, say, 1000 iterations during training:
if (sample_index > 0) and (sample_index % self.display_loss_how_often == 0):
print("\npredicted value at the output nodes: ", predicted_output_vals)
print("\nloss for training sample indexed %d: %s" % (sample_index, str(loss)))
print("\nestimated partial derivatives of vars wrt learnable parameters:")
for k,v in partial_var_to_param.items():
print("\nk=%s v=%s" % (k, str(v)))
print("\nestimated partial derivatives of vars wrt other vars:")
for k,v in partial_var_to_var.items():
print("\nk=%s v=%s" % (k, str(v)))
## The rest of the code in this function is for backpropagating the loss and updating the learnable params:
## To update the value of each learnable parameter during backprop, we need to find all paths to where
## that parameter resides in the graph from each output node:
paths = {param : [] for param in self.learnable_params}
## partial_var_to_param dict returned by forward..(). It is a partial derivative of each var to every
## learnable param at each node of the graph. That is, each node in the graph maintains its own such dict.
for var1 in partial_var_to_param:
for var2 in partial_var_to_param[var1]:
for param in self.learnable_params:
if partial_var_to_param[var1][var2][param] is not None:
paths[param] += [var1,var2,param]
for param in paths:
node = paths[param][0]
if node in self.output_vars:
continue
for var_out in self.output_vars:
if node in self.depends_on[var_out]:
## this tells us which output node contributes to a param -- since the
## backprop for a param must start at each of those nodes
paths[param].insert(0,var_out)
else:
for node2 in self.depends_on[var_out]:
if node in self.depends_on[node2]:
paths[param].insert(0,node2)
paths[param].insert(0,var_out)
## We now multiply the partials along the paths for gradient value for each param:
for param in self.learnable_params:
product_of_partials = 1.0
for i in range(len(paths[param]) - 2):
var1 = paths[param][i]
var2 = paths[param][i+1]
product_of_partials *= partial_var_to_var[var1][var2]
if self.debug:
print("\n\nfor param=%s, product of partials: %s" % str(product_of_partials))
product_of_partials *= partial_var_to_param[var1][var2][param]
self.vals_for_learnable_params[param] -= self.learning_rate * product_of_partials
if (sample_index > 0) and (sample_index % self.display_loss_how_often == 0):
print("\n\n[sample index: %d]: input val: %s vals for learnable parameters: %s" % \
(sample_index, str(input_vals_for_ind_vars), str(self.vals_for_learnable_params)))
def forward_propagate_one_input_sample_with_partial_deriv_calc(self, sample_index, input_vals_for_ind_vars):
'''
If you want to look at how the information flows in the DAG when you don't have to worry about
estimating the partial derivatives, see the method gen_gt_dataset(). As you will notice in the
implementation code for that method, there is nothing much to pushing the input values through
the nodes and the arcs of a computational graph if we are not concerned about estimating the
partial derivatives.
On the other hand, if you want to see how one might also estimate the partial derivatives as
during the forward flow of information in a computational graph, the forward_propagate...()
presented here is the method to examine. We first split the expression that the node
variable depends on into its constituent parts on the basis of '+' and '-' operators and
subsequently, for each part, we estimate the partial of the node variable with respect
to the variables and the learnable parameters in that part.
The needed partial derivatives are all calculated using the finite difference method in which
you add a small grad_delta value to the value of the variable with respect to which you are
calculating the partial and you then estimate the resulting change at the node in question.
The change divided by grad_delta is the partial derivative you are looking for.
'''
predicted_output_vals = {var : None for var in self.output_vars}
vals_for_dependent_vars = {var: None for var in self.all_vars if var not in self.independent_vars}
partials_var_to_param = {var : {var : {ele: None for ele in self.learnable_params} for
var in self.all_vars} for var in self.all_vars}
partials_var_to_var = {var : {var : None for var in self.all_vars} for var in self.all_vars}
while any(v is None for v in [vals_for_dependent_vars[x] for x in vals_for_dependent_vars]):
for var1 in self.all_vars:
if var1 in self.dependent_vars and vals_for_dependent_vars[var1] is None: continue
for var2 in self.leads_to[var1]:
if any([vals_for_dependent_vars[var] is None for var in self.depends_on[var2]
if var not in self.independent_vars]): continue
exp = self.expressions_dict[var2]
learnable_params_in_exp = [ele for ele in self.learnable_params if ele in exp]
## in order to calculate the partials of the node (each node stands for a variable)
## values with respect to the learnable params, and, then, with respect to the
## source vars, we must break the exp at '+' and '-' operators:
parts = re.split(r'\+|-', exp)
if self.debug:
print("\n\n ====for var2=%s ================= for exp=%s parts: %s" % (var2, str(exp), str(parts)))
vals_for_parts = []
for part in parts:
splits_at_arith = re.split(r'\*|/', part)
if len(splits_at_arith) > 1:
operand1 = splits_at_arith[0]
operand2 = splits_at_arith[1]
if '^' in operand1:
operand1 = operand1[:operand1.find('^')]
if '^' in operand2:
operand2 = operand2[:operand2.find('^')]
if operand1.startswith('x'):
var_in_part = operand1
param_in_part = operand2
elif operand2.startswith('x'):
var_in_part = operand2
param_in_part = operand1
else:
sys.exit("you are not following the convention -- aborting")
else:
if '^' in part:
ele_in_part = part[:part.find('^')]
if ele_in_part.startswith('x'):
var_in_part = ele_in_part
param_in_part = ""
else:
param_in_part = ele_in_part
var_in_part = ""
else:
if part.startswith('x'):
var_in_part = part
param_in_part = ""
else:
param_in_part = part
var_in_part = ""
if self.debug:
print("\n\n\nvar_in_part: %s para_in_part=%s" % (var_in_part, param_in_part))
part_for_partial_var2var = copy.deepcopy(part)
part_for_partial_var2param = copy.deepcopy(part)
if self.debug:
print("\n\nSTEP1a: part: %s of exp: %s" % (part, exp))
print("STEP1b: part_for_partial_var2var: %s of exp: %s" % (part_for_partial_var2var, exp))
print("STEP1c: part_for_partial_var2param: %s of exp: %s" % (part_for_partial_var2param, exp))
if var_in_part in self.independent_vars:
part = part.replace(var_in_part, str(input_vals_for_ind_vars[var_in_part]))
part_for_partial_var2var = part_for_partial_var2var.replace(var_in_part,
str(input_vals_for_ind_vars[var_in_part] + self.grad_delta))
part_for_partial_var2param = part_for_partial_var2param.replace(var_in_part,
str(input_vals_for_ind_vars[var_in_part]))
if self.debug:
print("\n\nSTEP2a: part: %s of exp=%s" % (part, exp))
print("STEP2b: part_for_partial_var2var: %s of exp=%s" % (part_for_partial_var2var, exp))
print("STEP2c: part_for_partial_var2param: %s of exp=%s" %
(part_for_partial_var2param, exp))
if var_in_part in self.dependent_vars:
if vals_for_dependent_vars[var_in_part] is not None:
part = part.replace(var_in_part, str(vals_for_dependent_vars[var_in_part]))
part_for_partial_var2var = part_for_partial_var2var.replace(var_in_part,
str(vals_for_dependent_vars[var_in_part] + self.grad_delta))
part_for_partial_var2param = part_for_partial_var2param.replace(var_in_part,
str(vals_for_dependent_vars[var_in_part]))
if self.debug:
print("\n\nSTEP3a: part=%s of exp: %s" % (part, exp))
print("STEP3b: part_for_partial_var2var=%s of exp: %s" % (part_for_partial_var2var, exp))
print("STEP3c: part_for_partial_var2param: %s of exp=%s" % (part_for_partial_var2param, exp))
## now we do the same thing wrt the learnable parameters:
if param_in_part != "" and param_in_part in self.learnable_params:
if self.vals_for_learnable_params[param_in_part] is not None:
part = part.replace(param_in_part, str(self.vals_for_learnable_params[param_in_part]))
part_for_partial_var2var = part_for_partial_var2var.replace(param_in_part,
str(self.vals_for_learnable_params[param_in_part]))
part_for_partial_var2param = part_for_partial_var2param.replace(param_in_part,
str(self.vals_for_learnable_params[param_in_part] + self.grad_delta))
if self.debug:
print("\n\nSTEP4a: part: %s of exp: %s" % (part, exp))
print("STEP4b: part_for_partial_var2var=%s of exp: %s" %
(part_for_partial_var2var, exp))
print("STEP4c: part_for_partial_var2param=%s of exp: %s" %
(part_for_partial_var2param, exp))
### Now evaluate the part for each of three cases:
evaled_part = eval( part.replace('^', '**') )
vals_for_parts.append(evaled_part)
evaled_partial_var2var = eval( part_for_partial_var2var.replace('^', '**') )
if param_in_part != "":
evaled_partial_var2param = eval( part_for_partial_var2param.replace('^', '**') )
partials_var_to_var[var2][var_in_part] = (evaled_partial_var2var - evaled_part) / self.grad_delta
if param_in_part != "":
partials_var_to_param[var2][var_in_part][param_in_part] = \
(evaled_partial_var2param - evaled_part) / self.grad_delta
vals_for_dependent_vars[var2] = sum(vals_for_parts)
predicted_output_vals = {var : vals_for_dependent_vars[var] for var in self.output_vars}
return predicted_output_vals, partials_var_to_param, partials_var_to_var
######################################################################################################
## New in Version 1.0.6:
######################################################################################################
####################### Start Definition of Inner Class AutogradCustomization ######################
class AutogradCustomization(torch.nn.Module):
"""
This class illustrates how you can add additional functionality of Autograd by
following the instructions posted at
https://pytorch.org/docs/stable/notes/extending.html
"""
def __init__(self, cgp, num_samples_per_class):
super(ComputationalGraphPrimer.AutogradCustomization, self).__init__()
self.cgp = cgp
self.num_samples_per_class = num_samples_per_class
class DoSillyWithTensor(torch.autograd.Function):
"""
Extending Autograd requires that you define a new verb class, as I have with
the class DoSillyWithTensor shown here, with definitions for two static
methods, "forward()" and "backward()". An instance constructed from this
class is callable. So when, in the "forward()" of the network, you pass a
training sample through an instance of DoSillyWithTensor, it is subject to
the code shown below in the "forward()" of this class.
"""
@staticmethod
def forward(ctx, input):
"""
The parameter 'input' is set to the training sample that is being
processed by an instance of DoSillyWithTensor in the "forward()" of a
network. We first make a deep copy of this tensor (which should be a
32-bit float) and then we subject the copy to a conversion to a one-byte
integer, which should cause a significant loss of information. We
calculate the difference between the original 32-bit float and the 8-bit
version and store it away in the context variable "ctx".
"""
input_orig = input.clone().double()
input = input.to(torch.uint8).double()
diff = input_orig.sub(input)
ctx.save_for_backward(diff)
return input
@staticmethod
def backward(ctx, grad_output):
"""
Whatever was stored in the context variable "ctx" during the forward pass
can be retrieved in the backward pass as shown below.
"""
diff, = ctx.saved_tensors
grad_input = grad_output.clone()
grad_input = grad_input + diff
return grad_input
def gen_training_data(self):
mean1,mean2 = [3.0,3.0], [5.0,5.0]
covar1,covar2 = [[1.0,0.0], [0.0,1.0]], [[1.0,0.0], [0.0,1.0]]
data1 = [(list(x),1) for x in np.random.multivariate_normal(mean1, covar1, self.num_samples_per_class)]
data2 = [(list(x),2) for x in np.random.multivariate_normal(mean2, covar2, self.num_samples_per_class)]
training_data = data1 + data2
random.shuffle( training_data )
self.training_data = training_data
def train_with_straight_autograd(self):
dtype = torch.float
D_in,H,D_out = 2,10,2
# w1 = torch.randn(D_in, H, device="cpu", dtype=dtype, requires_grad=True)
# w2 = torch.randn(H, D_out, device="cpu", dtype=dtype, requires_grad=True)
w1 = torch.randn(D_in, H, device="cpu", dtype=dtype)
w2 = torch.randn(H, D_out, device="cpu", dtype=dtype)
w1 = w1.to(self.cgp.device)
w2 = w2.to(self.cgp.device)
w1.requires_grad_()
w2.requires_grad_()
Loss = []
for epoch in range(self.cgp.epochs):
for i,data in enumerate(self.training_data):
input, label = data
x,y = torch.as_tensor(np.array(input)), torch.as_tensor(np.array(label))
x,y = x.float(), y.float()
if self.cgp.device:
x,y = x.to(self.cgp.device), y.to(self.cgp.device)
y_pred = x.view(1,-1).mm(w1).clamp(min=0).mm(w2)
loss = (y_pred - y).pow(2).sum()
if i % 200 == 199:
Loss.append(loss.item())
print("epoch=%d i=%d" % (epoch,i), loss.item())
# w1.retain_grad()
# w2.retain_grad()
loss.backward()
with torch.no_grad():
w1 -= self.cgp.learning_rate * w1.grad
w2 -= self.cgp.learning_rate * w2.grad
w1.grad.zero_()
w2.grad.zero_()
print("\n\n\nLoss: %s" % str(Loss))
import matplotlib.pyplot as plt
plt.figure("Loss vs training (straight autograd)")
plt.plot(Loss)
plt.show()
def train_with_extended_autograd(self):
dtype = torch.float
D_in,H,D_out = 2,10,2
# w1 = torch.randn(D_in, H, device="cpu", dtype=dtype, requires_grad=True)
# w2 = torch.randn(H, D_out, device="cpu", dtype=dtype, requires_grad=True)
w1 = torch.randn(D_in, H, device="cpu", dtype=dtype)
w2 = torch.randn(H, D_out, device="cpu", dtype=dtype)
w1 = w1.to(self.cgp.device)
w2 = w2.to(self.cgp.device)
w1.requires_grad_()
w2.requires_grad_()
Loss = []
for epoch in range(self.cgp.epochs):
for i,data in enumerate(self.training_data):
## Constructing an instance of DoSillyWithTensor. It is callable.
do_silly = ComputationalGraphPrimer.AutogradCustomization.DoSillyWithTensor.apply
input, label = data
x,y = torch.as_tensor(np.array(input)), torch.as_tensor(np.array(label))
## Now process the training instance with the "do_silly" instance:
x = do_silly(x)
x,y = x.float(), y.float()
x,y = x.to(self.cgp.device), y.to(self.cgp.device)
y_pred = x.view(1,-1).mm(w1).clamp(min=0).mm(w2)
loss = (y_pred - y).pow(2).sum()
if i % 200 == 199:
Loss.append(loss.item())
print("epoch=%d i=%d" % (epoch,i), loss.item())
# w1.retain_grad()
# w2.retain_grad()
loss.backward()
with torch.no_grad():
w1 -= self.cgp.learning_rate * w1.grad
w2 -= self.cgp.learning_rate * w2.grad
w1.grad.zero_()
w2.grad.zero_()
print("\n\n\nLoss: %s" % str(Loss))
import matplotlib.pyplot as plt
plt.figure("loss vs training (extended autograd)")
plt.plot(Loss)
plt.show()
######################################################################################################
######################################################################################################
###################################### Utility Functions ############################################
def calculate_loss(self, predicted_val, true_val):
error = true_val - predicted_val
loss = np.linalg.norm(error)
return loss
def plot_loss(self):
plt.figure()
plt.plot(self.LOSS)
plt.show()
def eval_expression(self, exp, vals_for_vars, vals_for_learnable_params, ind_vars=None):
self.debug1 = False
if self.debug1:
print("\n\nSTEP1: [original expression] exp: %s" % exp)
if ind_vars is not None:
for var in ind_vars:
exp = exp.replace(var, str(vals_for_vars[var]))
else:
for var in self.independent_vars:
exp = exp.replace(var, str(vals_for_vars[var]))
if self.debug1:
print("\n\nSTEP2: [replaced ars by their vals] exp: %s" % exp)
for ele in self.learnable_params:
exp = exp.replace(ele, str(vals_for_learnable_params[ele]))
if self.debug1:
print("\n\nSTEP4: [replaced learnable params by their vals] exp: %s" % exp)
return eval( exp.replace('^', '**') )
def gen_gt_dataset(self, vals_for_learnable_params={}):
'''
This method illustrates that it is trivial to forward-propagate the information through
the computational graph if you are not concerned about estimating the partial derivatives
at the same time. This method is used to generate 'dataset_size' number of input/output
values for the computational graph for given values for the learnable parameters.
'''
N = self.dataset_size
for i in range(N):
if self.debug:
print("\n\n\n================== Gen GT: iteration %d ============================\n" % i)
# vals_for_ind_vars = {var: random.uniform(0,1) for var in self.independent_vars}
vals_for_ind_vars = {var: random.uniform(-1,1) for var in self.independent_vars}
self.dataset_input_samples[i] = vals_for_ind_vars
vals_for_dependent_vars = {var: None for var in self.all_vars if var not in self.independent_vars}
while True:
if not any(v is None for v in [vals_for_dependent_vars[x] for x in vals_for_dependent_vars]):
break
for var1 in self.all_vars:
for var2 in self.leads_to[var1]:
if vals_for_dependent_vars[var2] is not None: continue
predecessor_vars = self.depends_on[var2]
predecessor_vars_without_inds = [x for x in predecessor_vars if x not in self.independent_vars]
if any(vals_for_dependent_vars[vart] is None for vart in predecessor_vars_without_inds): continue
exp = self.expressions_dict[var2]
if self.debug:
print("\n\nSTEP1: [original expression] exp: %s" % exp)
for var in self.independent_vars:
exp = exp.replace(var, str(vals_for_ind_vars[var]))
if self.debug:
print("\n\nSTEP2: [replaced independent vars by their vals] exp: %s" % exp)
for var in self.dependent_vars:
if vals_for_dependent_vars[var] is not None:
exp = exp.replace(var, str(vals_for_dependent_vars[var]))
if self.debug:
print("\n\nSTEP3: [replaced dependent vars by their vals] exp: %s" % exp)
for ele in self.learnable_params:
exp = exp.replace(ele, str(vals_for_learnable_params[ele]))
if self.debug:
print("\n\nSTEP4: [replaced learnable params by their vals] exp: %s" % exp)
vals_for_dependent_vars[var2] = eval( exp.replace('^', '**') )
self.true_output_vals[i] = {ovar : vals_for_dependent_vars[ovar] for ovar in self.output_vars}
if self.debug:
print("\n\n\ninput samples: %s" % str(self.dataset_input_samples))
print("\n\n\noutput vals: %s" % str(self.true_output_vals))
def gen_training_data(self):
"""
This 2-class dataset is used for the demos in the following Examples directory scripts:
one_neuron_classifier.py
multi_neuron_classifier.py
multi_neuron_classifier.py
The classes are labeled 0 and 1. All of the data for class 0 is simply a list of
numbers associated with the key 0. Similarly all the data for class 1 is another list of
numbers associated with the key 1.
For each class, the dataset starts out as being standard normal (zero mean and unit variance)
to which we add a mean value of 2.0 for class 0 and we add mean value of 4 to the square of
the original numbers for class 1.
"""
num_input_vars = len(self.independent_vars)
training_data_class_0 = []
training_data_class_1 = []
for i in range(self.dataset_size //2):
# Standard normal means N(0,1), meaning zero mean and unit variance
# Such values are significant in the interval [-3.0,+3.0]
for_class_0 = np.random.standard_normal( num_input_vars )
for_class_1 = np.random.standard_normal( num_input_vars )
# translate class_1 data so that the mean is shifted to +4.0 and also
# change the variance:
for_class_0 = for_class_0 + 2.0
for_class_1 = for_class_1 * 2 + 4.0
training_data_class_0.append( for_class_0 )
training_data_class_1.append( for_class_1 )
training_data = {0 : training_data_class_0, 1 : training_data_class_1}
return training_data
def gen_gt_dataset_with_activations(self, vals_for_learnable_params={}):
'''
This method illustrates that it is trivial to forward-propagate the information through
the computational graph if you are not concerned about estimating the partial derivatives
at the same time. This method is used to generate 'dataset_size' number of input/output
values for the computational graph for given values for the learnable parameters.
'''
N = self.dataset_size
for i in range(N):
if self.debug:
print("\n\n\n================== Gen GT: iteration %d ============================\n" % i)
# vals_for_ind_vars = {var: random.uniform(0,1) for var in self.independent_vars}
vals_for_ind_vars = {var: random.uniform(-1,1) for var in self.independent_vars}
self.dataset_input_samples[i] = vals_for_ind_vars
vals_for_dependent_vars = {var: None for var in self.all_vars if var not in self.independent_vars}
while True:
if not any(v is None for v in [vals_for_dependent_vars[x] for x in vals_for_dependent_vars]):
break
for var1 in self.all_vars:
for var2 in self.leads_to[var1]:
if vals_for_dependent_vars[var2] is not None: continue
predecessor_vars = self.depends_on[var2]
predecessor_vars_without_inds = [x for x in predecessor_vars if x not in self.independent_vars]
if any(vals_for_dependent_vars[vart] is None for vart in predecessor_vars_without_inds): continue
exp = self.expressions_dict[var2]
if self.debug:
print("\n\nSTEP1: [original expression] exp: %s" % exp)
for var in self.independent_vars:
exp = exp.replace(var, str(vals_for_ind_vars[var]))
if self.debug:
print("\n\nSTEP2: [replaced independent vars by their vals] exp: %s" % exp)
for var in self.dependent_vars:
if vals_for_dependent_vars[var] is not None:
exp = exp.replace(var, str(vals_for_dependent_vars[var]))
if self.debug:
print("\n\nSTEP3: [replaced dependent vars by their vals] exp: %s" % exp)
for ele in self.learnable_params:
exp = exp.replace(ele, str(vals_for_learnable_params[ele]))
if self.debug:
print("\n\nSTEP4: [replaced learnable params by their vals] exp: %s" % exp)
vals_for_dependent_vars[var2] = eval( exp.replace('^', '**') )
## ReLU activation:
vals_for_dependent_vars[var2] = 0 if vals_for_dependent_vars[var2] < 0 else vals_for_dependent_vars[var2]
self.true_output_vals[i] = {ovar : vals_for_dependent_vars[ovar] for ovar in self.output_vars}
if self.debug:
print("\n\n\ninput samples: %s" % str(self.dataset_input_samples))
print("\n\n\noutput vals: %s" % str(self.true_output_vals))
#_________________________ End of ComputationalGraphPrimer Class Definition ___________________________
#______________________________ Test code follows _________________________________
if __name__ == '__main__':
pass