__version__ = '3.4.3'
__author__ = "Avinash Kak (kak@purdue.edu)"
__date__ = '2016-May-14'
__url__ = 'https://engineering.purdue.edu/kak/distDT/DecisionTree-3.4.3.html'
__copyright__ = "(C) 2016 Avinash Kak. Python Software Foundation."
import math
import re
import sys
import functools
import operator
import itertools
import string
import os
#----------------------------------- Utility Functions ------------------------------------
def sample_index(sample_name):
'''
When the training data is read from a CSV file, we assume that the first column
of each data record contains a unique integer identifier for the record in that
row. This training data is stored in a dictionary whose keys are the prefix
'sample_' followed by the identifying integers. The purpose of this function is to
return the identifying integer associated with a data record.
'''
m = re.search('_(.+)$', sample_name)
return int(m.group(1))
def deep_copy_array(array_in):
'''
Meant only for an array of scalars (no nesting):
'''
array_out = []
for i in range(len(array_in)):
array_out.append( array_in[i] )
return array_out
def minimum(arr):
'''
Returns simultaneously the minimum value and its positional index in an
array. [Could also have used min() and index() defined for Python's
sequence types.]
'''
minval,index = None,None
for i in range(0, len(arr)):
if minval is None or arr[i] < minval:
index = i
minval = arr[i]
return minval,index
def convert(value):
try:
answer = float(value)
return answer
except:
return value
def closest_sampling_point(value, arr):
if value == 'NA': return None # in version 3.3.0
compare = [abs(x - value) for x in arr]
minval,index = minimum(compare)
return arr[index]
def cleanup_csv(line):
line = line.translate(bytes.maketrans(b":?/()[]{}'",b" ")) \
if sys.version_info[0] == 3 else line.translate(string.maketrans(":?/()[]{}'"," "))
double_quoted = re.findall(r'"[^\"]+"', line[line.find(',') : ])
for item in double_quoted:
clean = re.sub(r',', r'', item[1:-1].strip())
parts = re.split(r'\s+', clean.strip())
line = str.replace(line, item, '_'.join(parts))
white_spaced = re.findall(r',(\s*[^,]+)(?=,|$)', line)
for item in white_spaced:
litem = item
litem = re.sub(r'\s+', '_', litem)
litem = re.sub(r'^\s*_|_\s*$', '', litem)
line = str.replace(line, "," + item, "," + litem) if line.endswith(item) else str.replace(line, "," + item + ",", "," + litem + ",")
fields = re.split(r',', line)
newfields = []
for field in fields:
newfield = field.strip()
if newfield == '':
newfields.append('NA')
else:
newfields.append(newfield)
line = ','.join(newfields)
return line
#----------------------------------- DecisionTree Class Definition ------------------------------------
class DecisionTree(object):
def __init__(self, *args, **kwargs ):
if kwargs and args:
raise SyntaxError(
'''DecisionTree constructor can only be called with keyword arguments for the
following keywords: training_datafile,entropy_threshold, max_depth_desired,
csv_class_column_index,symbolic_to_numeric_cardinality_threshold, number_of_histogram_bins,
csv_columns_for_features,csv_cleanup_needed,number_of_histogram_bins, debug1, debug2, and debug3''')
allowed_keys = 'training_datafile','entropy_threshold','max_depth_desired','csv_class_column_index',\
'symbolic_to_numeric_cardinality_threshold','csv_columns_for_features',\
'number_of_histogram_bins','csv_cleanup_needed','debug1','debug2','debug3'
keywords_used = kwargs.keys()
for keyword in keywords_used:
if keyword not in allowed_keys:
raise SyntaxError(keyword + ": Wrong keyword used --- check spelling")
training_datafile=entropy_threshold=max_depth_desired=csv_class_column_index=number_of_histogram_bins= None
symbolic_to_numeric_cardinality_threshold=csv_columns_for_features=csv_cleanup_needed=debug1=debug2=debug3=None
if kwargs and not args:
if 'training_datafile' in kwargs : training_datafile = kwargs.pop('training_datafile')
if 'entropy_threshold' in kwargs : entropy_threshold = kwargs.pop('entropy_threshold')
if 'max_depth_desired' in kwargs : max_depth_desired = kwargs.pop('max_depth_desired')
if 'csv_class_column_index' in kwargs: csv_class_column_index = kwargs.pop('csv_class_column_index')
if 'csv_columns_for_features' in kwargs: \
csv_columns_for_features = kwargs.pop('csv_columns_for_features')
if 'symbolic_to_numeric_cardinality_threshold' in kwargs: \
symbolic_to_numeric_cardinality_threshold = \
kwargs.pop('symbolic_to_numeric_cardinality_threshold')
if 'number_of_histogram_bins' in kwargs: \
number_of_histogram_bins = kwargs.pop('number_of_histogram_bins')
if 'csv_cleanup_needed' in kwargs: csv_cleanup_needed = kwargs.pop('csv_cleanup_needed')
if 'debug1' in kwargs : debug1 = kwargs.pop('debug1')
if 'debug2' in kwargs : debug2 = kwargs.pop('debug2')
if 'debug3' in kwargs : debug3 = kwargs.pop('debug3')
if not args and training_datafile:
self._training_datafile = training_datafile
elif not args and not training_datafile:
raise SyntaxError('''You must specify a training datafile''')
else:
if (args[0] != 'evalmode') and (args[0] != 'boostingmode'):
raise SyntaxError("""When supplying non-keyword arg, it can only be 'evalmode' or 'boostingmode'""")
if entropy_threshold:
self._entropy_threshold = entropy_threshold
else:
self._entropy_threshold = 0.01
if max_depth_desired:
self._max_depth_desired = max_depth_desired
else:
self._max_depth_desired = None
if number_of_histogram_bins:
self._number_of_histogram_bins = number_of_histogram_bins
else:
self._number_of_histogram_bins = None
if csv_class_column_index:
self._csv_class_column_index = csv_class_column_index
else:
self._csv_class_column_index = None
if csv_columns_for_features:
self._csv_columns_for_features = csv_columns_for_features
else:
self._csv_columns_for_features = None
if symbolic_to_numeric_cardinality_threshold:
self._symbolic_to_numeric_cardinality_threshold = symbolic_to_numeric_cardinality_threshold
else:
self._symbolic_to_numeric_cardinality_threshold = 10
if csv_cleanup_needed:
self._csv_cleanup_needed = csv_cleanup_needed
else:
self._csv_cleanup_needed = 0
if debug1:
self._debug1 = debug1
else:
self._debug1 = 0
if debug2:
self._debug2 = debug2
else:
self._debug2 = 0
if debug3:
self._debug3 = debug3
else:
self._debug3 = 0
self._root_node = None
self._probability_cache = {}
self._entropy_cache = {}
self._training_data_dict = {}
self._features_and_values_dict = {}
self._features_and_unique_values_dict = {}
self._samples_class_label_dict = {}
self._class_names = []
self._class_priors_dict = {}
self._feature_names = []
self._numeric_features_valuerange_dict = {}
self._sampling_points_for_numeric_feature_dict = {}
self._feature_values_how_many_uniques_dict = {}
self._prob_distribution_numeric_features_dict = {}
self._histogram_delta_dict = {}
self._num_of_histogram_bins_dict = {}
def get_training_data(self):
if not self._training_datafile.endswith('.csv'):
TypeError("Aborted. get_training_data_from_csv() is only for CSV files")
class_names = []
all_record_ids_with_class_labels = {}
firstline = None
data_dict = {}
with open(self._training_datafile) as f:
for i,line in enumerate(f):
record = cleanup_csv(line) if self._csv_cleanup_needed else line
if i == 0:
firstline = record
continue
parts = record.rstrip().split(r',')
data_dict[parts[0].strip('"')] = parts[1:]
class_names.append(parts[self._csv_class_column_index])
all_record_ids_with_class_labels[parts[0].strip('"')] = parts[self._csv_class_column_index]
if i%10000 == 0:
print('.'),
sys.stdout.flush()
sys.stdout = sys.__stdout__
f.close()
self._how_many_total_training_samples = i # i is less by 1 from total num of records; but that's okay
unique_class_names = list(set(class_names))
if self._debug1:
print("\n\nTotal number of training samples: %d\n" % self._how_many_total_training_samples)
all_feature_names = firstline.rstrip().split(',')[1:]
class_column_heading = all_feature_names[self._csv_class_column_index - 1]
feature_names = [all_feature_names[i-1] for i in self._csv_columns_for_features]
class_for_sample_dict = { "sample_" + key :
class_column_heading + "=" + data_dict[key][self._csv_class_column_index - 1] for key in data_dict}
sample_names = ["sample_" + key for key in data_dict]
feature_values_for_samples_dict = {"sample_" + key :
list(map(operator.add, list(map(operator.add, feature_names, "=" * len(feature_names))),
[str(convert(data_dict[key][i-1])) for i in self._csv_columns_for_features]))
for key in data_dict}
features_and_values_dict = {all_feature_names[i-1] :
[convert(data_dict[key][i-1]) for key in data_dict] for i in self._csv_columns_for_features}
all_class_names = sorted(list(set(class_for_sample_dict.values())))
if self._debug1: print("\n All class names: "+ str(all_class_names))
numeric_features_valuerange_dict = {}
feature_values_how_many_uniques_dict = {}
features_and_unique_values_dict = {}
for feature in features_and_values_dict:
unique_values_for_feature = list(set(features_and_values_dict[feature]))
unique_values_for_feature = sorted(list(filter(lambda x: x != 'NA', unique_values_for_feature)))
feature_values_how_many_uniques_dict[feature] = len(unique_values_for_feature)
if all(isinstance(x,float) for x in unique_values_for_feature):
numeric_features_valuerange_dict[feature] = \
[min(unique_values_for_feature), max(unique_values_for_feature)]
unique_values_for_feature.sort(key=float)
features_and_unique_values_dict[feature] = sorted(unique_values_for_feature)
if self._debug1:
print("\nAll class names: " + str(all_class_names))
print("\nEach sample data record:")
for item in sorted(feature_values_for_samples_dict.items(), key = lambda x: sample_index(x[0]) ):
print(item[0] + " => " + str(item[1]))
print("\nclass label for each data sample:")
for item in sorted(class_for_sample_dict.items(), key=lambda x: sample_index(x[0])):
print(item[0] + " => " + str(item[1]))
print("\nfeatures and the values taken by them:")
for item in sorted(features_and_values_dict.items()):
print(item[0] + " => " + str(item[1]))
print("\nnumeric features and their ranges:")
for item in sorted(numeric_features_valuerange_dict.items()):
print(item[0] + " => " + str(item[1]))
print("\nnumber of unique values in each feature:")
for item in sorted(feature_values_how_many_uniques_dict.items()):
print(item[0] + " => " + str(item[1]))
self._class_names = all_class_names
self._feature_names = feature_names
self._samples_class_label_dict = class_for_sample_dict
self._training_data_dict = feature_values_for_samples_dict
self._features_and_values_dict = features_and_values_dict
self._features_and_unique_values_dict = features_and_unique_values_dict
self._numeric_features_valuerange_dict = numeric_features_valuerange_dict
self._feature_values_how_many_uniques_dict = feature_values_how_many_uniques_dict
def calculate_first_order_probabilities(self):
print("\nEstimating probabilities...")
for feature in self._feature_names:
self.probability_of_feature_value(feature,None)
if self._debug2:
if feature in self._prob_distribution_numeric_features_dict:
print("\nPresenting probability distribution for a feature considered to be numeric:")
for sampling_point in \
sorted(self._prob_distribution_numeric_features_dict[feature].keys()):
print(feature + '::' + str(sampling_point) + ' = ' + "{0:.5f}".format(
self._prob_distribution_numeric_features_dict[feature][sampling_point]))
else:
print("\nPresenting probabilities for the values of a feature considered to be symbolic:")
values_for_feature = self._features_and_unique_values_dict[feature]
for value in values_for_feature:
prob = self.probability_of_feature_value(feature,value)
print(feature + '::' + str(value) + ' = ' + "{0:.5f}".format(prob))
def show_training_data(self):
print("Class names: %s" % str(self._class_names))
print("\n\nFeatures and Their Values:\n\n")
features = self._features_and_values_dict.keys()
for feature in sorted(features):
nums,strings = [x for x in self._features_and_values_dict[feature] if isinstance(x,(float,int))],[x for x in self._features_and_values_dict[feature] if not isinstance(x,(float,int))]
print("%s ---> %s" % (feature, sorted(nums) + strings))
print("\n\nSamples vs. Class Labels:\n\n")
for item in sorted(self._samples_class_label_dict.items(), key = lambda x: sample_index(x[0]) ):
print(item)
print("\n\nTraining Samples:\n\n")
for item in sorted(self._training_data_dict.items(), key = lambda x: sample_index(x[0]) ):
print(item)
#----------------------------- Classify with Decision Tree --------------------------------
def classify(self, root_node, features_and_values):
'''
Classifies one test sample at a time using the decision tree constructed from
your training file. The data record for the test sample must be supplied as
shown in the scripts in the `Examples' subdirectory. See the scripts
construct_dt_and_classify_one_sample_caseX.py in that subdirectory.
'''
if not self._check_names_used(features_and_values):
raise SyntaxError("\n\nError in the names you have used for features and/or values. Try using the csv_cleanup_needed option in the constructor call.")
new_features_and_values = []
pattern = r'(\S+)\s*=\s*(\S+)'
for feature_and_value in features_and_values:
m = re.search(pattern, feature_and_value)
feature,value = m.group(1),m.group(2)
value = convert(value)
newvalue = value
if ((feature not in self._prob_distribution_numeric_features_dict) and
(all(isinstance(x,float) for x in self._features_and_unique_values_dict[feature]))):
newvalue = closest_sampling_point(value, self._features_and_unique_values_dict[feature])
new_features_and_values.append(feature + " = " + str(newvalue))
features_and_values = new_features_and_values
if self._debug3: print("\nCL1 New feature and values: "+ str(features_and_values))
answer = {class_name : None for class_name in self._class_names}
answer['solution_path'] = []
classification = self.recursive_descent_for_classification(root_node,features_and_values, answer)
answer['solution_path'].reverse()
if self._debug3:
print("\nCL2 The classification:")
for class_name in self._class_names:
print(" " + class_name + " with probability " + str(classification[class_name]))
classification_for_display = {}
for item in classification:
if isinstance(classification[item], float):
classification_for_display[item] = "%0.3f" % classification[item]
else:
classification_for_display[item] = ["NODE" + str(x) for x in classification[item]]
return classification_for_display
def recursive_descent_for_classification(self, node, feature_and_values, answer):
children = node.get_children()
if len(children) == 0:
leaf_node_class_probabilities = node.get_class_probabilities()
for i in range(len(self._class_names)):
answer[self._class_names[i]] = leaf_node_class_probabilities[i]
answer['solution_path'].append(node.get_serial_num())
return answer
feature_tested_at_node = node.get_feature()
if self._debug3: print("\nCLRD1 Feature tested at node for classification: " + feature_tested_at_node)
value_for_feature = None
path_found = None
pattern = r'(\S+)\s*=\s*(\S+)'
feature,value = None,None
for feature_and_value in feature_and_values:
m = re.search(pattern, feature_and_value)
feature,value = m.group(1),m.group(2)
if feature == feature_tested_at_node:
value_for_feature = convert(value)
# added in 3.2.0:
if value_for_feature is None:
leaf_node_class_probabilities = node.get_class_probabilities()
for i in range(len(self._class_names)):
answer[self._class_names[i]] = leaf_node_class_probabilities[i]
answer['solution_path'].append(node.get_serial_num())
return answer
if feature_tested_at_node in self._prob_distribution_numeric_features_dict:
if self._debug3: print( "\nCLRD2 In the truly numeric section")
for child in children:
branch_features_and_values = child.get_branch_features_and_values_or_thresholds()
last_feature_and_value_on_branch = branch_features_and_values[-1]
pattern1 = r'(.+)<(.+)'
pattern2 = r'(.+)>(.+)'
if re.search(pattern1, last_feature_and_value_on_branch):
m = re.search(pattern1, last_feature_and_value_on_branch)
feature,threshold = m.group(1),m.group(2)
if value_for_feature <= float(threshold):
path_found = True
answer = self.recursive_descent_for_classification(child, feature_and_values, answer)
answer['solution_path'].append(node.get_serial_num())
break
if re.search(pattern2, last_feature_and_value_on_branch):
m = re.search(pattern2, last_feature_and_value_on_branch)
feature,threshold = m.group(1),m.group(2)
if value_for_feature > float(threshold):
path_found = True
answer = self.recursive_descent_for_classification(child, feature_and_values, answer)
answer['solution_path'].append(node.get_serial_num())
break
if path_found: return answer
else:
feature_value_combo = "".join([feature_tested_at_node,"=", str(convert(value_for_feature))])
if self._debug3:
print("\nCLRD3 In the symbolic section with feature_value_combo: " + feature_value_combo)
for child in children:
branch_features_and_values = child.get_branch_features_and_values_or_thresholds()
if self._debug3: print("\nCLRD4 branch features and values: "+str(branch_features_and_values))
last_feature_and_value_on_branch = branch_features_and_values[-1]
if last_feature_and_value_on_branch == feature_value_combo:
answer = self.recursive_descent_for_classification(child, feature_and_values, answer)
answer['solution_path'].append(node.get_serial_num())
path_found = True
break
if path_found: return answer
if not path_found:
leaf_node_class_probabilities = node.get_class_probabilities()
for i in range(0, len(self._class_names)):
answer[self._class_names[i]] = leaf_node_class_probabilities[i]
answer['solution_path'].append(node.get_serial_num())
return answer
def classify_by_asking_questions(self, root_node):
'''
If you want classification to be carried out by engaging a human user in a
question-answer session, this is the method to use for that purpose. See the
script classify_by_asking_questions.py in the Examples subdirectory for an
illustration of how to do that.
'''
answer = {class_name : None for class_name in self._class_names}
answer['solution_path'] = []
scratchpad_for_numeric_answers = {feature : None
for feature in self._prob_distribution_numeric_features_dict}
classification = self.interactive_recursive_descent_for_classification(root_node,
answer, scratchpad_for_numeric_answers)
classification['solution_path'].reverse()
classification_for_display = {}
for item in classification:
if isinstance(classification[item], float):
classification_for_display[item] = "%0.3f" % classification[item]
else:
classification_for_display[item] = ["NODE" + str(x) for x in classification[item]]
return classification_for_display
def interactive_recursive_descent_for_classification(self, node, answer, scratchpad_for_numerics):
pattern1 = r'(.+)<(.+)'
pattern2 = r'(.+)>(.+)'
user_value_for_feature = None
children = node.get_children()
if len(children) == 0:
leaf_node_class_probabilities = node.get_class_probabilities()
for i in range(len(self._class_names)):
answer[self._class_names[i]] = leaf_node_class_probabilities[i]
answer['solution_path'].append(node.get_serial_num())
return answer
list_of_branch_attributes_to_children = []
for child in children:
branch_features_and_values = child.get_branch_features_and_values_or_thresholds()
feature_and_value_on_branch = branch_features_and_values[-1]
list_of_branch_attributes_to_children.append(feature_and_value_on_branch)
feature_tested_at_node = node.get_feature()
feature_value_combo = None
path_found = None
if feature_tested_at_node in self._prob_distribution_numeric_features_dict:
if scratchpad_for_numerics[feature_tested_at_node]:
user_value_for_feature = scratchpad_for_numerics[feature_tested_at_node]
else:
valuerange = self._numeric_features_valuerange_dict[feature_tested_at_node]
while True:
if sys.version_info[0] == 3:
user_value_for_feature = input( "\nWhat is the value for the feature '" +
feature_tested_at_node + "'?" + "\n" +
"Enter a value in the range: " + str(valuerange) + " => " )
else:
user_value_for_feature = raw_input( "\nWhat is the value for the feature '" +
feature_tested_at_node + "'?" + "\n" +
"Enter a value in the range: " + str(valuerange) + " => " )
user_value_for_feature = convert(user_value_for_feature.strip())
answer_found = 0
if valuerange[0] <= user_value_for_feature <= valuerange[1]:
answer_found = 1
break
if answer_found == 1: break
print("You entered illegal value. Let's try again")
scratchpad_for_numerics[feature_tested_at_node] = user_value_for_feature
for i in range(len(list_of_branch_attributes_to_children)):
branch_attribute = list_of_branch_attributes_to_children[i]
if re.search(pattern1, branch_attribute):
m = re.search(pattern1, branch_attribute)
feature,threshold = m.group(1),m.group(2)
if user_value_for_feature <= float(threshold):
answer = self.interactive_recursive_descent_for_classification(children[i],
answer, scratchpad_for_numerics)
path_found = True
answer['solution_path'].append(node.get_serial_num())
break
if re.search(pattern2, branch_attribute):
m = re.search(pattern2, branch_attribute)
feature,threshold = m.group(1),m.group(2)
if user_value_for_feature > float(threshold):
answer = self.interactive_recursive_descent_for_classification(children[i],
answer, scratchpad_for_numerics)
answer['solution_path'].append(node.get_serial_num())
break
if path_found: return answer
else:
possible_values_for_feature = self._features_and_unique_values_dict[feature_tested_at_node]
while True:
if sys.version_info[0] == 3:
user_value_for_feature = \
input( "\nWhat is the value for the feature '" + feature_tested_at_node +
"'?" + "\n" + "Enter one of: " + str(possible_values_for_feature) + " => " )
else:
user_value_for_feature = \
raw_input( "\nWhat is the value for the feature '" + feature_tested_at_node +
"'?" + "\n" + "Enter one of: " + str(possible_values_for_feature) + " => " )
user_value_for_feature = convert(user_value_for_feature.strip())
answer_found = 0
for value in possible_values_for_feature:
if value == user_value_for_feature:
answer_found = 1
break
if answer_found == 1: break
print("You entered illegal value. Let's try again")
feature_value_combo = "".join([feature_tested_at_node,"=",str(user_value_for_feature)])
for i in range(len(list_of_branch_attributes_to_children)):
branch_attribute = list_of_branch_attributes_to_children[i]
if branch_attribute == feature_value_combo:
answer = self.interactive_recursive_descent_for_classification(children[i],
answer, scratchpad_for_numerics)
path_found = True
answer['solution_path'].append(node.get_serial_num())
break
if path_found: return answer
if not path_found:
leaf_node_class_probabilities = node.get_class_probabilities()
for i in range(0, len(self._class_names)):
answer[self._class_names[i]] = leaf_node_class_probabilities[i]
answer['solution_path'].append(node.get_serial_num())
return answer
##------------------------------- Construct Decision Tree ------------------------------------
def construct_decision_tree_classifier(self):
'''
Construct the root node object and set its entropy value as derived from the priors
associated with the different classes.
'''
print("\nConstructing a decision tree...")
if self._debug3:
self.determine_data_condition()
print("\nStarting construction of the decision tree:\n")
class_probabilities = list(map(lambda x: self.prior_probability_for_class(x), self._class_names))
if self._debug3:
print("\nPrior class probabilities: " + str(class_probabilities))
print("\nClass names: " + str(self._class_names))
entropy = self.class_entropy_on_priors()
if self._debug3: print("\nClass entropy on priors: "+ str(entropy))
# root_node = self.DTNode(None, entropy, class_probabilities, [], self, 'root')
root_node = DTNode(None, entropy, class_probabilities, [], self, 'root')
root_node.set_class_names(self._class_names)
self._root_node = root_node
self.recursive_descent(root_node)
return root_node
def recursive_descent(self, node):
'''
After the root node of the decision tree is constructed by the previous method, we
find at that node the feature that yields the greatest reduction in class entropy
from the entropy based on just the class priors. The logic for finding this
feature is different for symbolic features and for numeric features (that logic is
built into the best feature calculator). We then invoke this method recursively to
create the rest of the tree.
'''
if self._debug3:
print("\n==================== ENTERING RECURSIVE DESCENT ==========================")
node_serial_number = node.get_serial_num()
features_and_values_or_thresholds_on_branch = node.get_branch_features_and_values_or_thresholds()
existing_node_entropy = node.get_node_entropy()
if self._debug3:
print("\nRD1 NODE SERIAL NUMBER: "+ str(node_serial_number))
print("\nRD2 Existing Node Entropy: " + str(existing_node_entropy))
print("\nRD3 features_and_values_or_thresholds_on_branch: " +
str(features_and_values_or_thresholds_on_branch))
class_probs = node.get_class_probabilities()
print("\nRD4 Class probabilities: " + str(class_probs))
if existing_node_entropy < self._entropy_threshold:
if self._debug3: print("\nRD5 returning because existing node entropy is below threshold")
return
copy_of_path_attributes = deep_copy_array(features_and_values_or_thresholds_on_branch)
best_feature,best_feature_entropy,best_feature_val_entropies,decision_val = \
self.best_feature_calculator(copy_of_path_attributes, existing_node_entropy)
node.set_feature(best_feature)
if self._debug3: node.display_node()
if self._max_depth_desired is not None and \
len(features_and_values_or_thresholds_on_branch) >= self._max_depth_desired:
if self._debug3: print("\nRD6 REACHED LEAF NODE AT MAXIMUM DEPTH ALLOWED")
return
if best_feature is None: return
if self._debug3:
print("\nRD7 Existing entropy at node: " + str(existing_node_entropy))
print("\nRD8 Calculated best feature is %s and its value %s" % (best_feature, decision_val))
print("\nRD9 Best feature entropy: "+ str(best_feature_entropy))
print("\nRD10 Calculated entropies for different values of best feature: %s" % str(best_feature_val_entropies))
entropy_gain = existing_node_entropy - best_feature_entropy
if self._debug3: print("\nRD11 Expected entropy gain at this node: " + str(entropy_gain))
if entropy_gain > self._entropy_threshold:
if best_feature in self._numeric_features_valuerange_dict and \
self._feature_values_how_many_uniques_dict[best_feature] > \
self._symbolic_to_numeric_cardinality_threshold:
best_threshold = decision_val # as returned by best feature calculator
best_entropy_for_less, best_entropy_for_greater = best_feature_val_entropies
extended_branch_features_and_values_or_thresholds_for_lessthan_child = \
deep_copy_array(features_and_values_or_thresholds_on_branch)
extended_branch_features_and_values_or_thresholds_for_greaterthan_child = \
deep_copy_array(features_and_values_or_thresholds_on_branch)
feature_threshold_combo_for_less_than = \
"".join([best_feature,"<",str(convert(best_threshold))])
feature_threshold_combo_for_greater_than = \
"".join([best_feature,">",str(convert(best_threshold))])
extended_branch_features_and_values_or_thresholds_for_lessthan_child.append(
feature_threshold_combo_for_less_than)
extended_branch_features_and_values_or_thresholds_for_greaterthan_child.append(
feature_threshold_combo_for_greater_than)
if self._debug3:
print("\nRD12 extended_branch_features_and_values_or_thresholds_for_lessthan_child: "+
str(extended_branch_features_and_values_or_thresholds_for_lessthan_child))
print("\nRD13 extended_branch_features_and_values_or_thresholds_for_greaterthan_child: " +
str(extended_branch_features_and_values_or_thresholds_for_greaterthan_child))
class_probabilities_for_lessthan_child_node = list(map(lambda x:
self.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(\
x, extended_branch_features_and_values_or_thresholds_for_lessthan_child), self._class_names))
class_probabilities_for_greaterthan_child_node = list(map(lambda x:
self.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(
x, extended_branch_features_and_values_or_thresholds_for_greaterthan_child),
self._class_names))
if self._debug3:
print("\nRD14 class entropy for going down lessthan child: " + str(best_entropy_for_less))
print("\nRD15 class_entropy_for_going_down_greaterthan_child: " +
str(best_entropy_for_greater))
if best_entropy_for_less < existing_node_entropy - self._entropy_threshold:
# left_child_node = self.DTNode(None, best_entropy_for_less,
left_child_node = DTNode(None, best_entropy_for_less,
class_probabilities_for_lessthan_child_node,
extended_branch_features_and_values_or_thresholds_for_lessthan_child, self)
node.add_child_link(left_child_node)
self.recursive_descent(left_child_node)
if best_entropy_for_greater < existing_node_entropy - self._entropy_threshold:
# right_child_node = self.DTNode(None, best_entropy_for_greater,
right_child_node = DTNode(None, best_entropy_for_greater,
class_probabilities_for_greaterthan_child_node,
extended_branch_features_and_values_or_thresholds_for_greaterthan_child, self)
node.add_child_link(right_child_node)
self.recursive_descent(right_child_node)
else:
if self._debug3:
print("\nRD16 RECURSIVE DESCENT: In section for symbolic features for creating children")
values_for_feature = self._features_and_unique_values_dict[best_feature]
if self._debug3:
print("\nRD17 Values for feature %s are %s" % (best_feature, str(values_for_feature)))
feature_value_combos = \
map(lambda x: "".join([best_feature,"=",x]), map(str, map(convert, values_for_feature)))
feature_value_combos = sorted(feature_value_combos)
class_entropies_for_children = []
for feature_and_value_index in range(len(feature_value_combos)):
if self._debug3:
print("\nRD18 Creating a child node for: " +
str(feature_value_combos[feature_and_value_index]))
extended_branch_features_and_values_or_thresholds = None
if features_and_values_or_thresholds_on_branch is None:
extended_branch_features_and_values_or_thresholds = \
[feature_value_combos[feature_and_value_index]]
else:
extended_branch_features_and_values_or_thresholds = \
deep_copy_array(features_and_values_or_thresholds_on_branch)
extended_branch_features_and_values_or_thresholds.append(
feature_value_combos[feature_and_value_index])
class_probabilities = list(map(lambda x:
self.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(
x, extended_branch_features_and_values_or_thresholds), self._class_names))
class_entropy_for_child = \
self.class_entropy_for_a_given_sequence_of_features_and_values_or_thresholds(
extended_branch_features_and_values_or_thresholds)
if self._debug3:
print("\nRD19 branch attributes: "+
str(extended_branch_features_and_values_or_thresholds))
print("\nRD20 class entropy for child: " + str(class_entropy_for_child))
if existing_node_entropy - class_entropy_for_child > self._entropy_threshold:
# child_node = self.DTNode(None, class_entropy_for_child,
child_node = DTNode(None, class_entropy_for_child,
class_probabilities, extended_branch_features_and_values_or_thresholds, self)
node.add_child_link( child_node )
self.recursive_descent(child_node)
elif self._debug3: print("\nRD21 This child will NOT result in a node")
else:
if self._debug3:
print("\nRD22 REACHED LEAF NODE NATURALLY for: " +
str(features_and_values_or_thresholds_on_branch))
return
def best_feature_calculator(self, features_and_values_or_thresholds_on_branch, existing_node_entropy):
'''
This is the heart of the decision tree constructor. Its main job is to figure
out the best feature to use for partitioning the training data samples that
correspond to the current node. The search for the best feature is carried
out differently for symbolic features and for numeric features. For a
symbolic feature, the method estimates the entropy for each value of the
feature and then averages out these entropies as a measure of the
discriminatory power of that features. For a numeric feature, on the other
hand, it estimates the entropy reduction that can be achieved if we were to
partition the set of training samples at each possible threshold for that
numeric feature. For a numeric feature, all possible sampling points
relevant to the node in question are considered as candidates for thresholds.
'''
pattern1 = r'(.+)=(.+)'
pattern2 = r'(.+)<(.+)'
pattern3 = r'(.+)>(.+)'
all_symbolic_features = []
for feature_name in self._feature_names:
if feature_name not in self._prob_distribution_numeric_features_dict:
all_symbolic_features.append(feature_name)
symbolic_features_already_used = []
for feature_and_value_or_threshold in features_and_values_or_thresholds_on_branch:
if re.search(pattern1, feature_and_value_or_threshold):
m = re.search(pattern1, feature_and_value_or_threshold)
feature = m.group(1)
symbolic_features_already_used.append(feature)
symbolic_features_not_yet_used = [x for x in all_symbolic_features if x not in symbolic_features_already_used]
true_numeric_types = []
symbolic_types = []
true_numeric_types_feature_names = []
symbolic_types_feature_names = []
for item in features_and_values_or_thresholds_on_branch:
if re.search(pattern2, item):
true_numeric_types.append(item)
m = re.search(pattern2, item)
feature,value = m.group(1),m.group(2)
true_numeric_types_feature_names.append(feature)
elif re.search(pattern3, item):
true_numeric_types.append(item)
m = re.search(pattern3, item)
feature,value = m.group(1),m.group(2)
true_numeric_types_feature_names.append(feature)
else:
symbolic_types.append(item)
m = re.search(pattern1, item)
feature,value = m.group(1),m.group(2)
symbolic_types_feature_names.append(feature)
true_numeric_types_feature_names = list(set(true_numeric_types_feature_names))
symbolic_types_feature_names = list(set(symbolic_types_feature_names))
bounded_intervals_numeric_types = self.find_bounded_intervals_for_numeric_features(true_numeric_types)
# Calculate the upper and the lower bounds to be used when searching for the best
# threshold for each of the numeric features that are in play at the current node:
upperbound = {feature : None for feature in true_numeric_types_feature_names}
lowerbound = {feature : None for feature in true_numeric_types_feature_names}
for item in bounded_intervals_numeric_types:
if item[1] == '>':
lowerbound[item[0]] = float(item[2])
else:
upperbound[item[0]] = float(item[2])
entropy_values_for_different_features = {}
partitioning_point_child_entropies_dict = {feature : {} for feature in self._feature_names}
partitioning_point_threshold = {feature : None for feature in self._feature_names}
entropies_for_different_values_of_symbolic_feature ={feature : [] for feature in self._feature_names}
for i in range(len(self._feature_names)):
feature_name = self._feature_names[i]
if self._debug3:
print("\n\nBFC1 FEATURE BEING CONSIDERED: " + feature_name)
if feature_name in symbolic_features_already_used:
continue
elif feature_name in self._numeric_features_valuerange_dict and \
self._feature_values_how_many_uniques_dict[feature_name] > \
self._symbolic_to_numeric_cardinality_threshold:
values = self._sampling_points_for_numeric_feature_dict[feature_name]
if self._debug3: print("\nBFC2 values for %s are %s " % (feature_name, values))
newvalues = []
if feature_name in true_numeric_types_feature_names:
if upperbound[feature_name] is not None and lowerbound[feature_name] is not None and \
lowerbound[feature_name] >= upperbound[feature_name]:
continue
elif upperbound[feature_name] is not None and lowerbound[feature_name] is not None and \
lowerbound[feature_name] < upperbound[feature_name]:
newvalues = [x for x in values \
if lowerbound[feature_name] < x <= upperbound[feature_name]]
elif upperbound[feature_name] is not None:
newvalues = [x for x in values if x <= upperbound[feature_name]]
elif lowerbound[feature_name] is not None:
newvalues = [x for x in values if x > lowerbound[feature_name]]
else:
raise Exception("Error in bound specifications in best feature calculator")
else:
newvalues = values
if len(newvalues) == 0:
continue
partitioning_entropies = []
for value in newvalues:
feature_and_less_than_value_string = "".join([feature_name,"<",str(convert(value))])
feature_and_greater_than_value_string = "".join([feature_name,">",str(convert(value))])
for_left_child = for_right_child = None
if features_and_values_or_thresholds_on_branch:
for_left_child = deep_copy_array(features_and_values_or_thresholds_on_branch)
for_left_child.append(feature_and_less_than_value_string)
for_right_child = deep_copy_array(features_and_values_or_thresholds_on_branch)
for_right_child.append(feature_and_greater_than_value_string)
else:
for_left_child = [feature_and_less_than_value_string]
for_right_child = [feature_and_greater_than_value_string]
entropy1 = self.class_entropy_for_less_than_threshold_for_feature(\
features_and_values_or_thresholds_on_branch,feature_name,value)
entropy2 = self.class_entropy_for_greater_than_threshold_for_feature(\
features_and_values_or_thresholds_on_branch,feature_name, value)
partitioning_entropy = entropy1 * \
self.probability_of_a_sequence_of_features_and_values_or_thresholds(for_left_child) \
+ entropy2 * \
self.probability_of_a_sequence_of_features_and_values_or_thresholds(for_right_child)
partitioning_entropies.append(partitioning_entropy)
partitioning_point_child_entropies_dict[feature_name][value] = (entropy1, entropy2)
min_entropy,best_partition_point_index = minimum(partitioning_entropies)
if min_entropy < existing_node_entropy:
partitioning_point_threshold[feature_name] = newvalues[best_partition_point_index]
entropy_values_for_different_features[feature_name] = min_entropy
else:
if self._debug3:
print("\nBFC3 Best feature calculator: Entering section reserved for symbolic features")
print("\nBFC4 Feature name: " + feature_name)
values = self._features_and_unique_values_dict[feature_name]
values = sorted(list(set(filter(lambda x: x != 'NA', values))))
if self._debug3: print("\nBFC5 values for feature %s are %s" % (feature_name, values))
entropy = 0
for value in values:
feature_value_string = feature_name + "=" + str(convert(value))
if self._debug3: print("\nBFC6 feature_value_string: " + feature_value_string)
extended_attributes = deep_copy_array(features_and_values_or_thresholds_on_branch)
if features_and_values_or_thresholds_on_branch:
extended_attributes.append(feature_value_string)
else:
extended_attributes = [feature_value_string]
entropy += \
self.class_entropy_for_a_given_sequence_of_features_and_values_or_thresholds(extended_attributes) * \
self.probability_of_a_sequence_of_features_and_values_or_thresholds(extended_attributes)
if self._debug3:
print("\nBFC7 Entropy calculated for symbolic feature value choice (%s, %s) is %s" % \
(feature_name,value,entropy))
entropies_for_different_values_of_symbolic_feature[feature_name].append(entropy)
if entropy < existing_node_entropy:
entropy_values_for_different_features[feature_name] = entropy
min_entropy_for_best_feature = None
best_feature_name = None
# sorting introduced in 3.2.0:
for feature_nom in sorted(entropy_values_for_different_features):
if not best_feature_name:
best_feature_name = feature_nom
min_entropy_for_best_feature = entropy_values_for_different_features[feature_nom]
else:
if entropy_values_for_different_features[feature_nom] < min_entropy_for_best_feature:
best_feature_name = feature_nom
min_entropy_for_best_feature = entropy_values_for_different_features[feature_nom]
if best_feature_name in partitioning_point_threshold:
threshold_for_best_feature = partitioning_point_threshold[best_feature_name]
else:
threshold_for_best_feature = None
best_feature_entropy = min_entropy_for_best_feature
val_based_entropies_to_be_returned = None
decision_val_to_be_returned = None
if best_feature_name in self._numeric_features_valuerange_dict and \
self._feature_values_how_many_uniques_dict[best_feature_name] > \
self._symbolic_to_numeric_cardinality_threshold:
val_based_entropies_to_be_returned = \
partitioning_point_child_entropies_dict[best_feature_name][threshold_for_best_feature]
else:
val_based_entropies_to_be_returned = None
if best_feature_name in partitioning_point_threshold:
decision_val_to_be_returned = partitioning_point_threshold[best_feature_name]
else:
decision_val_to_be_returned = None
if self._debug3:
print("\nBFC8 Val based entropies to be returned for feature %s are %s" %
(best_feature_name, str(val_based_entropies_to_be_returned)))
return best_feature_name, best_feature_entropy, val_based_entropies_to_be_returned, decision_val_to_be_returned
#----------------------------------- Entropy Calculators ------------------------------------
def class_entropy_on_priors(self):
if 'priors' in self._entropy_cache:
return self._entropy_cache['priors']
entropy = None
for class_name in self._class_names:
prob = self.prior_probability_for_class(class_name)
if (prob >= 0.0001) and (prob <= 0.999):
log_prob = math.log(prob,2)
if prob < 0.0001:
log_prob = 0
if prob > 0.999:
log_prob = 0
if entropy is None:
entropy = -1.0 * prob * log_prob
continue
entropy += -1.0 * prob * log_prob
if abs(entropy) < 0.0000001: entropy = 0.0
self._entropy_cache['priors'] = entropy
return entropy
def entropy_scanner_for_a_numeric_feature(self, feature):
all_sampling_points = self._sampling_points_for_numeric_feature_dict[feature]
entropies_for_less_than_thresholds = []
entropies_for_greater_than_thresholds = []
for point in all_sampling_points:
entropies_for_less_than_thresholds.append(
self.class_entropy_for_less_than_threshold_for_feature([], feature, point))
entropies_for_greater_than_thresholds.append(
self.class_entropy_for_greater_than_threshold_for_feature([], feature, point))
print("\nSCANNER: All entropies less than thresholds for feature %s are: %s" %
(feature, entropies_for_less_than_thresholds))
print("\nSCANNER: All entropies greater than thresholds for feature %s are: %s" %
(feature, entropies_for_greater_than_thresholds))
def class_entropy_for_less_than_threshold_for_feature(self, \
array_of_features_and_values_or_thresholds, feature, threshold):
threshold = convert(threshold)
feature_threshold_combo = feature + '<' + str(threshold)
sequence = ":".join(array_of_features_and_values_or_thresholds) + ":" + feature_threshold_combo
if sequence in self._entropy_cache:
return self._entropy_cache[sequence]
copy_of_array_of_features_and_values_or_thresholds = \
deep_copy_array(array_of_features_and_values_or_thresholds)
copy_of_array_of_features_and_values_or_thresholds.append(feature_threshold_combo)
entropy = None
for class_name in self._class_names:
prob = self.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(\
class_name, copy_of_array_of_features_and_values_or_thresholds)
if (prob >= 0.0001) and (prob <= 0.999):
log_prob = math.log(prob,2)
if prob < 0.0001:
log_prob = 0
if prob > 0.999:
log_prob = 0
if entropy is None:
entropy = -1.0 * prob * log_prob
continue
entropy += -1.0 * prob * log_prob
self._entropy_cache[sequence] = entropy
if abs(entropy) < 0.0000001: entropy = 0.0
return entropy
def class_entropy_for_greater_than_threshold_for_feature(self,
array_of_features_and_values_or_thresholds, feature, threshold):
threshold = convert(threshold)
feature_threshold_combo = feature + '>' + str(threshold)
sequence = ":".join(array_of_features_and_values_or_thresholds) + ":" + feature_threshold_combo
if sequence in self._entropy_cache:
return self._entropy_cache[sequence]
copy_of_array_of_features_and_values_or_thresholds = \
deep_copy_array(array_of_features_and_values_or_thresholds)
copy_of_array_of_features_and_values_or_thresholds.append(feature_threshold_combo)
entropy = None
for class_name in self._class_names:
prob = self.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(\
class_name, copy_of_array_of_features_and_values_or_thresholds)
if (prob >= 0.0001) and (prob <= 0.999):
log_prob = math.log(prob,2)
if prob < 0.0001:
log_prob = 0
if prob > 0.999:
log_prob = 0
if entropy is None:
entropy = -1.0 * prob * log_prob
continue
entropy += -1.0 * prob * log_prob
if abs(entropy) < 0.0000001: entropy = 0.0
self._entropy_cache[sequence] = entropy
return entropy
def class_entropy_for_a_given_sequence_of_features_and_values_or_thresholds(self,
array_of_features_and_values_or_thresholds):
sequence = ":".join(array_of_features_and_values_or_thresholds)
if sequence in self._entropy_cache:
return self._entropy_cache[sequence]
entropy = None
for class_name in self._class_names:
prob = self.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(\
class_name, array_of_features_and_values_or_thresholds)
if (prob >= 0.0001) and (prob <= 0.999):
log_prob = math.log(prob,2)
if prob < 0.0001:
log_prob = 0
if prob > 0.999:
log_prob = 0
if entropy is None:
entropy = -1.0 * prob * log_prob
continue
entropy += -1.0 * prob * log_prob
if abs(entropy) < 0.0000001: entropy = 0.0
self._entropy_cache[sequence] = entropy
return entropy
#--------------------------------- Probability Calculators ----------------------------------
def prior_probability_for_class(self, class_name):
class_name_in_cache = "".join(["prior::", class_name])
if class_name_in_cache in self._probability_cache:
return self._probability_cache[class_name_in_cache]
total_num_of_samples = len( self._samples_class_label_dict )
all_values = self._samples_class_label_dict.values()
for this_class_name in self._class_names:
trues = list(filter( lambda x: x == this_class_name, all_values ))
prior_for_this_class = (1.0 * len(trues)) / total_num_of_samples
self._class_priors_dict[this_class_name] = prior_for_this_class
this_class_name_in_cache = "".join(["prior::", this_class_name])
self._probability_cache[this_class_name_in_cache] = prior_for_this_class
return self._probability_cache[class_name_in_cache]
def calculate_class_priors(self):
print("\nEstimating class priors...")
if len(self._class_priors_dict) > 1: return
for class_name in self._class_names:
class_name_in_cache = "".join(["prior::", class_name])
total_num_of_samples = len( self._samples_class_label_dict )
all_values = self._samples_class_label_dict.values()
trues = list(filter( lambda x: x == class_name, all_values ))
prior_for_this_class = (1.0 * len(trues)) / total_num_of_samples
self._class_priors_dict[class_name] = prior_for_this_class
this_class_name_in_cache = "".join(["prior::", class_name])
self._probability_cache[this_class_name_in_cache] = prior_for_this_class
if self._debug2: print(str(self._class_priors_dict))
def probability_of_feature_value(self, feature_name, value):
value = convert(value)
if (value is not None) and (feature_name in self._sampling_points_for_numeric_feature_dict):
value = closest_sampling_point(convert(value), self._sampling_points_for_numeric_feature_dict[feature_name])
if value is not None:
feature_and_value = "".join([feature_name, "=", str(convert(value))])
if (value is not None) and (feature_and_value in self._probability_cache):
return self._probability_cache[feature_and_value]
histogram_delta = num_of_hist_bins = valuerange = diffrange = None
if feature_name in self._numeric_features_valuerange_dict:
if self._feature_values_how_many_uniques_dict[feature_name] > self._symbolic_to_numeric_cardinality_threshold:
if feature_name not in self._sampling_points_for_numeric_feature_dict:
valuerange = self._numeric_features_valuerange_dict[feature_name]
diffrange = valuerange[1] - valuerange[0]
unique_values_for_feature = sorted(list(set(filter(lambda x: x != 'NA',
self._features_and_values_dict[feature_name]))))
diffs = sorted([unique_values_for_feature[i] - unique_values_for_feature[i-1]
for i in range(1,len(unique_values_for_feature))])
median_diff = diffs[int(len(diffs)/2) - 1]
histogram_delta = median_diff * 2
if histogram_delta < diffrange / 500:
if self._number_of_histogram_bins:
histogram_delta = diffrange / self._number_of_histogram_bins
else:
histogram_delta = diffrange / 500;
self._histogram_delta_dict[feature_name] = histogram_delta
num_of_histogram_bins = int(diffrange / histogram_delta) + 1
self._num_of_histogram_bins_dict[feature_name] = num_of_histogram_bins
sampling_points_for_feature = [valuerange[0] + histogram_delta * j for j in range(num_of_histogram_bins)]
self._sampling_points_for_numeric_feature_dict[feature_name] = sampling_points_for_feature
if feature_name in self._numeric_features_valuerange_dict:
if self._feature_values_how_many_uniques_dict[feature_name] > self._symbolic_to_numeric_cardinality_threshold:
sampling_points_for_feature = self._sampling_points_for_numeric_feature_dict[feature_name]
counts_at_sampling_points = [0] * len(sampling_points_for_feature)
actual_values_for_feature = self._features_and_values_dict[feature_name]
actual_values_for_feature = list(filter(lambda x: x != 'NA', actual_values_for_feature))
for i in range(len(sampling_points_for_feature)):
for j in range(len(actual_values_for_feature)):
if abs(sampling_points_for_feature[i] -
convert(actual_values_for_feature[j])) < (histogram_delta):
counts_at_sampling_points[i] += 1
total_counts = functools.reduce(lambda x,y:x+y, counts_at_sampling_points)
probs = [x / (1.0 * total_counts) for x in counts_at_sampling_points]
sum_probs = functools.reduce(lambda x,y:x+y, probs)
bin_prob_dict = {sampling_points_for_feature[i] : probs[i]
for i in range(len(sampling_points_for_feature))}
self._prob_distribution_numeric_features_dict[feature_name] = bin_prob_dict
values_for_feature = list(map(lambda x: feature_name + "=" + x, map(str, sampling_points_for_feature)))
for i in range(0, len(values_for_feature)):
self._probability_cache[values_for_feature[i]] = probs[i]
if (value is not None) and (feature_and_value in self._probability_cache):
return self._probability_cache[feature_and_value]
else:
return 0
else:
# This section is for those numeric features treated symbolically:
values_for_feature = list(set(self._features_and_values_dict[feature_name]))
values_for_feature = list(filter(lambda x: x != 'NA', values_for_feature))
values_for_feature = list(map(lambda x: "".join([feature_name,"=",x]), map(str,values_for_feature)))
value_counts = [0] * len(values_for_feature)
for sample in sorted(self._training_data_dict.keys(), key = lambda x: sample_index(x) ):
features_and_values = self._training_data_dict[sample]
for i in range(0, len(values_for_feature)):
for current_value in (features_and_values):
if values_for_feature[i] == current_value:
value_counts[i] += 1
total_counts = functools.reduce(lambda x,y:x+y, value_counts)
if total_counts == 0:
raise Exception('''PFV Something is wrong with your training file. '''
'''It contains no training samples for feature named %s ''' % feature_name)
probs = [x / (1.0 * total_counts) for x in value_counts]
for i in range(0, len(values_for_feature)):
self._probability_cache[values_for_feature[i]] = probs[i]
if (value is not None) and (feature_and_value in self._probability_cache):
return self._probability_cache[feature_and_value]
else:
return 0
else:
# This section is only for purely symbolic features:
values_for_feature = self._features_and_values_dict[feature_name]
values_for_feature = list(map(lambda x: feature_name + "=" + x, values_for_feature))
value_counts = [0] * len(values_for_feature)
for sample in sorted(self._training_data_dict.keys(), key = lambda x: sample_index(x) ):
features_and_values = self._training_data_dict[sample]
for i in range(0, len(values_for_feature)):
for current_value in features_and_values:
if values_for_feature[i] == current_value:
value_counts[i] += 1
for i in range(0, len(values_for_feature)):
self._probability_cache[values_for_feature[i]] = value_counts[i] / (1.0 * len(self._training_data_dict))
if (value is not None) and (feature_and_value in self._probability_cache):
return self._probability_cache[feature_and_value]
else:
return 0
def probability_of_feature_value_given_class(self,feature_name,feature_value,class_name):
feature_value = convert(feature_value)
histogram_delta = num_of_histogram_bins = valuerange = diffrange = None
if feature_name in self._sampling_points_for_numeric_feature_dict:
feature_value = closest_sampling_point(convert(feature_value), \
self._sampling_points_for_numeric_feature_dict[feature_name])
feature_value_class = "".join([feature_name,"=",str(feature_value),"::",class_name])
if feature_value_class in self._probability_cache:
return self._probability_cache[feature_value_class]
if feature_name in self._numeric_features_valuerange_dict:
if self._feature_values_how_many_uniques_dict[feature_name] > self._symbolic_to_numeric_cardinality_threshold:
histogram_delta = self._histogram_delta_dict[feature_name]
num_of_histogram_bins = self._num_of_histogram_bins_dict[feature_name]
valuerange = self._numeric_features_valuerange_dict[feature_name]
diffrange = valuerange[1] - valuerange[0]
samples_for_class = []
# Accumulate all samples names for the given class:
for sample_name in self._samples_class_label_dict:
if self._samples_class_label_dict[sample_name] == class_name:
samples_for_class.append(sample_name)
if feature_name in self._numeric_features_valuerange_dict:
if self._feature_values_how_many_uniques_dict[feature_name] > self._symbolic_to_numeric_cardinality_threshold:
sampling_points_for_feature = self._sampling_points_for_numeric_feature_dict[feature_name]
counts_at_sampling_points = [0] * len(sampling_points_for_feature)
actual_feature_values_for_samples_in_class = []
for sample in samples_for_class:
for feature_and_value in self._training_data_dict[sample]:
pattern = r'(.+)=(.+)'
m = re.search(pattern, feature_and_value)
feature,value = m.group(1),m.group(2)
if feature == feature_name and value != 'NA':
actual_feature_values_for_samples_in_class.append(convert(value))
for i in range(len(sampling_points_for_feature)):
for j in range(len(actual_feature_values_for_samples_in_class)):
if abs(sampling_points_for_feature[i] -
actual_feature_values_for_samples_in_class[j]) < histogram_delta:
counts_at_sampling_points[i] += 1
total_counts = functools.reduce(lambda x,y:x+y, counts_at_sampling_points)
probs = [x / (1.0 * total_counts) for x in counts_at_sampling_points]
if total_counts == 0:
raise Exception('''PFVC1 Something is wrong with your training file. It contains no training '''
'''samples for Class %s and Feature %s''' % class_name, feature_name)
values_for_feature_and_class = list(map(lambda x: feature_name + "=" + x +
"::" + class_name, map(str, sampling_points_for_feature)))
for i in range(0, len(values_for_feature_and_class)):
self._probability_cache[values_for_feature_and_class[i]] = probs[i]
if feature_value_class in self._probability_cache:
return self._probability_cache[feature_value_class]
else:
return 0
else:
# We now take care of numeric features with a small number of unique values
values_for_feature = list(set(self._features_and_values_dict[feature_name]))
values_for_feature = list(filter(lambda x: x != 'NA', values_for_feature))
values_for_feature =list(map(lambda x: "".join([feature_name,"=",x]), map(str,map(convert, values_for_feature))))
value_counts = [0] * len(values_for_feature)
for sample in samples_for_class:
features_and_values = self._training_data_dict[sample]
for i in range(0, len(values_for_feature)):
for current_value in (features_and_values):
if values_for_feature[i] == current_value:
value_counts[i] += 1
total_count = functools.reduce(lambda x,y:x+y, value_counts)
if total_count == 0:
raise Exception('''PFVC2 Something is wrong with your training file. It contains no training samples '''
'''for Class %s and Feature %s''' % (class_name, feature_name))
# We normalize by total_count because the probabilities are
# conditioned on a given class
for i in range(len(values_for_feature)):
feature_and_value_and_class = values_for_feature[i] + "::" + class_name
self._probability_cache[feature_and_value_and_class] = value_counts[i] / (1.0 * total_count)
if feature_value_class in self._probability_cache:
return self._probability_cache[feature_value_class]
else:
return 0
else:
# This section is for purely symbolic features
values_for_feature = list(set(self._features_and_values_dict[feature_name]))
values_for_feature = \
list(map(lambda x: "".join([feature_name,"=",x]), map(str,values_for_feature)))
value_counts = [0] * len(values_for_feature)
for sample in samples_for_class:
features_and_values = self._training_data_dict[sample]
for i in range(len(values_for_feature)):
for current_value in (features_and_values):
if values_for_feature[i] == current_value:
value_counts[i] += 1
total_count = functools.reduce(lambda x,y:x+y, value_counts)
if total_count == 0:
raise Exception('''PFVC3 Something is wrong with your training file. It contains no training samples '''
'''for Class %s and Feature %s''' % (class_name, feature_name))
# We normalize by total_count because the probabilities are
# conditioned on a given class
for i in range(0, len(values_for_feature)):
feature_and_value_for_class = "".join([values_for_feature[i],"::",class_name])
self._probability_cache[feature_and_value_for_class] = value_counts[i] / (1.0 * total_count)
feature_and_value_and_class = "".join([feature_name,"=", feature_value,"::",class_name])
if feature_and_value_and_class in self._probability_cache:
return self._probability_cache[feature_and_value_and_class]
else:
return 0
def probability_of_feature_less_than_threshold(self, feature_name, threshold):
threshold = convert(threshold)
feature_threshold_combo = feature_name + '<' + str(threshold)
if feature_threshold_combo in self._probability_cache:
return self._probability_cache[feature_threshold_combo]
all_values = list(filter(lambda x: x != 'NA', self._features_and_values_dict[feature_name]))
all_values_less_than_threshold = list(filter(lambda x: x <= threshold, all_values))
probability = 1.0 * len(all_values_less_than_threshold) / len(all_values)
self._probability_cache[feature_threshold_combo] = probability
return probability
def probability_of_feature_less_than_threshold_given_class(self, feature_name, threshold, class_name):
threshold = convert(threshold)
feature_threshold_class_combo = "".join([feature_name,'<',str(threshold),"::",class_name])
if feature_threshold_class_combo in self._probability_cache:
return self._probability_cache[feature_threshold_class_combo]
data_samples_for_class = []
# Accumulate all samples names for the given class:
for sample_name in self._samples_class_label_dict:
if self._samples_class_label_dict[sample_name] == class_name:
data_samples_for_class.append(sample_name)
actual_feature_values_for_samples_in_class = []
for sample in data_samples_for_class:
for feature_and_value in self._training_data_dict[sample]:
pattern = r'(.+)=(.+)'
m = re.search(pattern, feature_and_value)
feature,value = m.group(1),m.group(2)
if feature == feature_name and value != 'NA':
actual_feature_values_for_samples_in_class.append(convert(value))
actual_points_for_feature_less_than_threshold = list(filter(lambda x: x <= threshold,
actual_feature_values_for_samples_in_class))
# The conditional in the assignment shown below introduced in Version 3.2.1:
probability = ((1.0 * len(actual_points_for_feature_less_than_threshold)) / len(actual_feature_values_for_samples_in_class)) if len(actual_feature_values_for_samples_in_class) > 0 else 0.0
self._probability_cache[feature_threshold_class_combo] = probability
return probability
def probability_of_a_sequence_of_features_and_values_or_thresholds(self,array_of_features_and_values_or_thresholds):
'''
This method requires that all truly numeric types only be expressed as '<' or '>'
constructs in the array of branch features and thresholds
'''
if len(array_of_features_and_values_or_thresholds) == 0: return
sequence = ":".join(array_of_features_and_values_or_thresholds)
if sequence in self._probability_cache:
return self._probability_cache[sequence]
probability = None
pattern1 = r'(.+)=(.+)'
pattern2 = r'(.+)<(.+)'
pattern3 = r'(.+)>(.+)'
true_numeric_types = []
true_numeric_types_feature_names = []
symbolic_types = []
symbolic_types_feature_names = []
for item in array_of_features_and_values_or_thresholds:
if re.search(pattern2, item):
true_numeric_types.append(item)
m = re.search(pattern2, item)
feature,value = m.group(1),m.group(2)
true_numeric_types_feature_names.append(feature)
elif re.search(pattern3, item):
true_numeric_types.append(item)
m = re.search(pattern3, item)
feature,value = m.group(1),m.group(2)
true_numeric_types_feature_names.append(feature)
else:
symbolic_types.append(item)
m = re.search(pattern1, item)
feature,value = m.group(1),m.group(2)
symbolic_types_feature_names.append(feature)
true_numeric_types_feature_names = list(set(true_numeric_types_feature_names))
symbolic_types_feature_names = list(set(symbolic_types_feature_names))
bounded_intervals_numeric_types = self.find_bounded_intervals_for_numeric_features(true_numeric_types)
# Calculate the upper and the lower bounds to be used when searching for the best
# threshold for each of the numeric features that are in play at the current node:
upperbound = {feature : None for feature in true_numeric_types_feature_names}
lowerbound = {feature : None for feature in true_numeric_types_feature_names}
for item in bounded_intervals_numeric_types:
if item[1] == '>':
lowerbound[item[0]] = float(item[2])
else:
upperbound[item[0]] = float(item[2])
for feature_name in true_numeric_types_feature_names:
if lowerbound[feature_name] is not None and upperbound[feature_name] is not None \
and upperbound[feature_name] <= lowerbound[feature_name]:
return 0
elif lowerbound[feature_name] is not None and upperbound[feature_name] is not None:
if not probability:
probability = self.probability_of_feature_less_than_threshold(feature_name, upperbound[feature_name]) - \
self.probability_of_feature_less_than_threshold(feature_name, lowerbound[feature_name])
else:
probability *= (self.probability_of_feature_less_than_threshold(feature_name, upperbound[feature_name]) - \
self.probability_of_feature_less_than_threshold(feature_name, lowerbound[feature_name]))
elif upperbound[feature_name] is not None and lowerbound[feature_name] is None:
if not probability:
probability = self.probability_of_feature_less_than_threshold(feature_name, upperbound[feature_name])
else:
probability *= self.probability_of_feature_less_than_threshold(feature_name, upperbound[feature_name])
elif lowerbound[feature_name] is not None and upperbound[feature_name] is None:
if not probability:
probability = 1.0 -self.probability_of_feature_less_than_threshold(feature_name, lowerbound[feature_name])
else:
probability *= (1.0 - self.probability_of_feature_less_than_threshold(feature_name,
lowerbound[feature_name]))
else:
raise SyntaxError("Ill formatted call to 'probability_of_sequence' method")
for feature_and_value in symbolic_types:
if re.search(pattern1, feature_and_value):
m = re.search(pattern1, feature_and_value)
feature,value = m.group(1),m.group(2)
if not probability:
probability = self.probability_of_feature_value(feature, value)
else:
probability *= self.probability_of_feature_value(feature, value)
self._probability_cache[sequence] = probability
return probability
def probability_of_a_sequence_of_features_and_values_or_thresholds_given_class(self,
array_of_features_and_values_or_thresholds, class_name):
'''
This method requires that all truly numeric types only be expressed as '<' or '>'
constructs in the array of branch features and thresholds
'''
if len(array_of_features_and_values_or_thresholds) == 0: return
sequence = ":".join(array_of_features_and_values_or_thresholds)
sequence_with_class = sequence + "::" + class_name
if sequence_with_class in self._probability_cache:
return self._probability_cache[sequence_with_class]
probability = None
pattern1 = r'(.+)=(.+)'
pattern2 = r'(.+)<(.+)'
pattern3 = r'(.+)>(.+)'
true_numeric_types = []
true_numeric_types_feature_names = []
symbolic_types = []
symbolic_types_feature_names = []
for item in array_of_features_and_values_or_thresholds:
if re.search(pattern2, item):
true_numeric_types.append(item)
m = re.search(pattern2, item)
feature,value = m.group(1),m.group(2)
true_numeric_types_feature_names.append(feature)
elif re.search(pattern3, item):
true_numeric_types.append(item)
m = re.search(pattern3, item)
feature,value = m.group(1),m.group(2)
true_numeric_types_feature_names.append(feature)
else:
symbolic_types.append(item)
m = re.search(pattern1, item)
feature,value = m.group(1),m.group(2)
symbolic_types_feature_names.append(feature)
true_numeric_types_feature_names = list(set(true_numeric_types_feature_names))
symbolic_types_feature_names = list(set(symbolic_types_feature_names))
bounded_intervals_numeric_types = self.find_bounded_intervals_for_numeric_features(true_numeric_types)
# Calculate the upper and the lower bounds to be used when searching for the best
# threshold for each of the numeric features that are in play at the current node:
upperbound = {feature : None for feature in true_numeric_types_feature_names}
lowerbound = {feature : None for feature in true_numeric_types_feature_names}
for item in bounded_intervals_numeric_types:
if item[1] == '>':
lowerbound[item[0]] = float(item[2])
else:
upperbound[item[0]] = float(item[2])
for feature_name in true_numeric_types_feature_names:
if lowerbound[feature_name] is not None and upperbound[feature_name] is not None \
and upperbound[feature_name] <= lowerbound[feature_name]:
return 0
elif lowerbound[feature_name] is not None and upperbound[feature_name] is not None:
if not probability:
probability = self.probability_of_feature_less_than_threshold_given_class(feature_name,
upperbound[feature_name], class_name) - \
self.probability_of_feature_less_than_threshold_given_class(feature_name,
lowerbound[feature_name], class_name)
else:
probability *= (self.probability_of_feature_less_than_threshold_given_class(feature_name,
upperbound[feature_name], class_name) - \
self.probability_of_feature_less_than_threshold_given_class(feature_name,
lowerbound[feature_name], class_name))
elif upperbound[feature_name] is not None and lowerbound[feature_name] is None:
if not probability:
probability = self.probability_of_feature_less_than_threshold_given_class(feature_name,
upperbound[feature_name], class_name)
else:
probability *= self.probability_of_feature_less_than_threshold_given_class(feature_name,
upperbound[feature_name], class_name)
elif lowerbound[feature_name] is not None and upperbound[feature_name] is None:
if not probability:
probability = 1.0 - \
self.probability_of_feature_less_than_threshold_given_class(feature_name,
lowerbound[feature_name], class_name)
else:
probability *= (1.0 -
self.probability_of_feature_less_than_threshold_given_class(feature_name,
lowerbound[feature_name], class_name))
else:
raise SyntaxError("Ill formatted call to 'probability of sequence with class' method")
for feature_and_value in symbolic_types:
if re.search(pattern1, feature_and_value):
m = re.search(pattern1, feature_and_value)
feature,value = m.group(1),m.group(2)
if not probability:
probability = self.probability_of_feature_value_given_class(feature, value, class_name)
else:
probability *= self.probability_of_feature_value_given_class(feature, value, class_name)
self._probability_cache[sequence_with_class] = probability
return probability
def probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(self,
class_name, array_of_features_and_values_or_thresholds):
sequence = ":".join(array_of_features_and_values_or_thresholds)
class_and_sequence = "".join([class_name, "::", sequence])
if class_and_sequence in self._probability_cache:
return self._probability_cache[class_and_sequence]
array_of_class_probabilities = [0] * len(self._class_names)
for i in range(len(self._class_names)):
class_name = self._class_names[i]
prob = self.probability_of_a_sequence_of_features_and_values_or_thresholds_given_class(
array_of_features_and_values_or_thresholds, class_name)
if prob < 0.000001:
array_of_class_probabilities[i] = 0.0
continue
prob_of_feature_sequence = self.probability_of_a_sequence_of_features_and_values_or_thresholds(
array_of_features_and_values_or_thresholds)
##Commented out in 2.2.4
#if not prob_of_feature_sequence:
# sys.exit('''PCS Something is wrong with your sequence of feature values and thresholds in '''
# '''probability_of_a_class_given_sequence_of_features_and_values_or_thresholds()''')
prior = self._class_priors_dict[self._class_names[i]]
if prob_of_feature_sequence:
array_of_class_probabilities[i] = prob * prior / prob_of_feature_sequence
else:
array_of_class_probabilities[i] = prior
sum_probability = functools.reduce(lambda x,y:x+y, array_of_class_probabilities)
if sum_probability == 0:
array_of_class_probabilities = [1.0/len(self._class_names)] * len(self._class_names)
else:
array_of_class_probabilities = \
list(map(lambda x: x / sum_probability, array_of_class_probabilities))
for i in range(len(self._class_names)):
this_class_and_sequence = "".join([self._class_names[i], "::", sequence])
self._probability_cache[this_class_and_sequence] = array_of_class_probabilities[i]
return self._probability_cache[class_and_sequence]
#--------------------------------- Class Based Utilities ------------------------------------
def determine_data_condition(self):
'''
This method estimates the worst-case fan-out of the decision tree taking into
account the number of values (and therefore the number of branches emanating
from a node) for the symbolic features.
'''
num_of_features = len(self._feature_names)
values = []
for feature in self._features_and_unique_values_dict:
if feature not in self._numeric_features_valuerange_dict:
values.append(self._features_and_unique_values_dict[feature])
# return if no symbolic features found:
if not values: return
print("Number of features: " + str(num_of_features))
max_num_values = max(list(map(len, values)))
print("Largest number of values for symbolic features is: " + str(max_num_values))
estimated_number_of_nodes = max_num_values ** num_of_features
print('''\nWORST CASE SCENARIO: The decision tree COULD have as many as %s '''
''' nodes. The exact number of nodes created depends critically on '''
'''the entropy_threshold used for node expansion (the default value '''
'''for this threshold is 0.01) and on the value set for max_depth_desired '''
'''for the depth of the tree\n''' % estimated_number_of_nodes)
if estimated_number_of_nodes > 10000:
print('''THIS IS WAY TOO MANY NODES. Consider using a relatively '''
'''large value for entropy_threshold and/or a small value for '''
'''for max_depth_desired to reduce the number of nodes created''')
ans = None
if sys.version_info[0] == 3:
ans = input("\nDo you wish to continue? Enter 'y' if yes: ")
else:
ans = raw_input("\nDo you wish to continue? Enter 'y' if yes: ")
ans = ans.strip()
if ans != 'y':
sys.exit(0)
def _check_names_used(self, features_and_values_test_data):
'''
This method is used to verify that you used legal feature names in the test
sample that you want to classify with the decision tree.
'''
for feature_and_value in features_and_values_test_data:
pattern = r'(\S+)\s*=\s*(\S+)'
m = re.search(pattern, feature_and_value)
feature,value = m.group(1),m.group(2)
if feature is None or value is None:
raise Exception("Your test data has formatting error")
if feature not in self._feature_names:
return 0
return 1
def get_class_names(self):
return self._class_names
def find_bounded_intervals_for_numeric_features(self, arr):
'''
Given a list of branch attributes for the numeric features of the form, say,
['g2<1','g2<2','g2<3','age>34','age>36','age>37'], this method returns the
smallest list that is relevant for the purpose of calculating the
probabilities. To explain, the probability that the feature `g2' is less
than 1 AND, at the same time, less than 2, AND, at the same time, less than
3, is the same as the probability that the feature less than 1. Similarly,
the probability that 'age' is greater than 34 and also greater than 37 is the
same as `age' being greater than 37.
'''
features = self._feature_names
arr1 = list(map(lambda x: re.split(r'(>|<)', x, 0), arr))
# make a separate list for each feature name:
arr3 = list(filter(lambda x: len(x)>0, [list(filter(lambda x: x[0]==y, arr1)) for y in features]))
# Sort each list so that '<' entries occur before '>' entries:
arr4 = [sorted(li, key=lambda x: x[1]) for li in arr3]
arr5 = [[list(filter(lambda x: x[1]==y, alist))] for alist in arr4 for y in ['<','>']]
arr6 = []
for i in range(len(arr5)):
arr6 += [sorted(li, key=lambda x: float(x[2])) for li in arr5[i]]
arr7 = list(itertools.chain(*arr6))
arr8 = list(filter(lambda x: len(x)>0, [list(filter(lambda x: x[0]==y, arr7)) for y in features]))
arr9 = []
for alist in arr8:
newalist = []
if alist[0][1] == '<':
newalist.append(alist[0])
else:
newalist.append(alist[-1])
if alist[0][1] != alist[-1][1]:
newalist.append(alist[-1])
arr9.append(newalist)
arr10 = list(itertools.chain(*arr9))
return arr10
#------------------------------------------- Class DTNode ------------------------------------------
class DTNode(object):
'''
The nodes of a decision tree are instances of this class:
'''
def __init__(self, feature, entropy, class_probabilities, branch_features_and_values_or_thresholds, dt,
root_or_not = None):
if root_or_not == 'root':
dt.nodes_created = -1
dt.class_names = None
self._dt = dt
self._serial_number = self.get_next_serial_num()
self._feature = feature
self._node_creation_entropy = entropy
self._class_probabilities = class_probabilities
self._branch_features_and_values_or_thresholds = branch_features_and_values_or_thresholds
self._linked_to = []
def how_many_nodes(self):
return self._dt.nodes_created + 1
def set_class_names(self, class_names_list):
self._dt.class_names = class_names_list
def get_class_names(self):
return self._dt.class_names
def set_node_creation_entropy(self, entropy):
self._node_creation_entropy = entropy
def get_next_serial_num(self):
self._dt.nodes_created += 1
return self._dt.nodes_created
def get_serial_num(self):
return self._serial_number
def get_feature(self):
'''
Returns the feature test at the current node
'''
return self._feature
def set_feature(self, feature):
self._feature = feature
def get_node_entropy(self):
return self._node_creation_entropy
def get_class_probabilities(self):
return self._class_probabilities
def get_branch_features_and_values_or_thresholds(self):
return self._branch_features_and_values_or_thresholds
def get_children(self):
return self._linked_to
def add_child_link(self, new_node):
self._linked_to.append(new_node)
def delete_all_links(self):
self._linked_to = None
def display_node(self):
feature_at_node = self.get_feature() or " "
node_creation_entropy_at_node = self.get_node_entropy()
print_node_creation_entropy_at_node = "%.3f" % node_creation_entropy_at_node
class_probabilities = self.get_class_probabilities()
class_probabilities_for_display = ["%0.3f" % x for x in class_probabilities]
serial_num = self.get_serial_num()
branch_features_and_values_or_thresholds = self.get_branch_features_and_values_or_thresholds()
print("\n\nNODE " + str(serial_num) +
":\n Branch features and values to this node: " +
str(branch_features_and_values_or_thresholds) +
"\n Class probabilities at current node: " +
str(class_probabilities_for_display) +
"\n Entropy at current node: " +
print_node_creation_entropy_at_node +
"\n Best feature test at current node: " + feature_at_node + "\n\n")
def display_decision_tree(self, offset):
serial_num = self.get_serial_num()
if len(self.get_children()) > 0:
feature_at_node = self.get_feature() or " "
node_creation_entropy_at_node = self.get_node_entropy()
print_node_creation_entropy_at_node = "%.3f" % node_creation_entropy_at_node
branch_features_and_values_or_thresholds = self.get_branch_features_and_values_or_thresholds()
class_probabilities = self.get_class_probabilities()
print_class_probabilities = ["%.3f" % x for x in class_probabilities]
print_class_probabilities_with_class = [self.get_class_names()[i] + " => " +
print_class_probabilities[i] for i in range(len(self.get_class_names()))]
print("NODE " + str(serial_num) + ": " + offset + "BRANCH TESTS TO NODE: " +
str(branch_features_and_values_or_thresholds))
second_line_offset = offset + " " * (8 + len(str(serial_num)))
print(second_line_offset + "Decision Feature: " +
feature_at_node + " Node Creation Entropy: " + print_node_creation_entropy_at_node +
" Class Probs: " + str(print_class_probabilities_with_class) + "\n")
offset += " "
for child in self.get_children():
child.display_decision_tree(offset)
else:
node_creation_entropy_at_node = self.get_node_entropy()
print_node_creation_entropy_at_node = "%.3f" % node_creation_entropy_at_node
branch_features_and_values_or_thresholds = self.get_branch_features_and_values_or_thresholds()
class_probabilities = self.get_class_probabilities()
print_class_probabilities = ["%.3f" % x for x in class_probabilities]
print_class_probabilities_with_class = [self.get_class_names()[i] + " => " +
print_class_probabilities[i] for i in range(len(self.get_class_names()))]
print("NODE " + str(serial_num) + ": " + offset + "BRANCH TESTS TO LEAF NODE: " +
str(branch_features_and_values_or_thresholds))
second_line_offset = offset + " " * (8 + len(str(serial_num)))
print(second_line_offset + "Node Creation Entropy: " + print_node_creation_entropy_at_node +
" Class Probs: " + str(print_class_probabilities_with_class) + "\n")
#----------------------------- Evaluate Quality of Training Data ----------------------------
class EvalTrainingData(DecisionTree):
def __init__(self, *args, **kwargs ):
DecisionTree.__init__(self, *args, **kwargs)
def evaluate_training_data(self):
evaldebug = 0
if not self._training_datafile.endswith('.csv'):
raise SyntaxError('''The data evaluation function in the module can only be used when your '''
'''training data is in a CSV file''')
print('''\nWill run a 10-fold cross-validation test on your training data to test its '''
'''class-discriminatory power:''')
all_training_data = self._training_data_dict
all_sample_names = sorted(all_training_data.keys(), key = lambda x: sample_index(x))
fold_size = int(0.1 * len(all_training_data))
confusion_matrix = {class_name : {class_name : 0 for class_name in self._class_names} \
for class_name in self._class_names}
for fold_index in range(10):
print("\nStarting the iteration indexed %d of the 10-fold cross-validation test" % fold_index)
testing_samples = all_sample_names[fold_size * fold_index : fold_size * (fold_index+1)]
training_samples = all_sample_names[0 : fold_size * fold_index] + \
all_sample_names[fold_size * (fold_index+1):]
testing_data = { x : all_training_data[x] for x in testing_samples }
training_data = { x : all_training_data[x] for x in training_samples }
trainingDT = DecisionTree('evalmode')
trainingDT._training_data_dict = training_data
trainingDT._class_names = self._class_names
trainingDT._feature_names = self._feature_names
trainingDT._entropy_threshold = self._entropy_threshold
trainingDT._max_depth_desired = self._max_depth_desired
trainingDT._symbolic_to_numeric_cardinality_threshold = self._symbolic_to_numeric_cardinality_threshold
trainingDT._samples_class_label_dict = {sample_name : self._samples_class_label_dict[sample_name]
for sample_name in training_samples}
trainingDT._features_and_values_dict = {feature : [] for feature in self._features_and_values_dict}
pattern = r'(\S+)\s*=\s*(\S+)'
for item in sorted(trainingDT._training_data_dict.items(), key = lambda x: sample_index(x[0])):
for feature_and_value in item[1]:
m = re.search(pattern, feature_and_value)
feature,value = m.group(1),m.group(2)
if value != 'NA':
trainingDT._features_and_values_dict[feature].append(convert(value))
trainingDT._features_and_unique_values_dict = {feature :
sorted(list(set(trainingDT._features_and_values_dict[feature]))) for
feature in trainingDT._features_and_values_dict}
trainingDT._numeric_features_valuerange_dict = {feature : []
for feature in self._numeric_features_valuerange_dict}
trainingDT._numeric_features_valuerange_dict = {feature :
[min(trainingDT._features_and_unique_values_dict[feature]),
max(trainingDT._features_and_unique_values_dict[feature])]
for feature in self._numeric_features_valuerange_dict}
if evaldebug:
print("\n\nprinting samples in the testing set: " + str(testing_samples))
print("\n\nPrinting features and their values in the training set:\n")
for item in sorted(trainingDT._features_and_values_dict.items()):
print(item[0] + " => " + str(item[1]))
print("\n\nPrinting unique values for features:\n")
for item in sorted(trainingDT._features_and_unique_values_dict.items()):
print(item[0] + " => " + str(item[1]))
print("\n\nPrinting unique value ranges for features:\n")
for item in sorted(trainingDT._numeric_features_valuerange_dict.items()):
print(item[0] + " => " + str(item[1]))
trainingDT._feature_values_how_many_uniques_dict = {feature : []
for feature in self._features_and_unique_values_dict}
trainingDT._feature_values_how_many_uniques_dict = {feature :
len(trainingDT._features_and_unique_values_dict[feature])
for feature in self._features_and_unique_values_dict}
if evaldebug: trainingDT._debug2 = 1
trainingDT.calculate_first_order_probabilities()
trainingDT.calculate_class_priors()
root_node = trainingDT.construct_decision_tree_classifier()
if evaldebug:
root_node.display_decision_tree(" ")
for test_sample_name in testing_samples:
test_sample_data = all_training_data[test_sample_name]
if evaldebug:
print("original data in test sample:", str(test_sample_data))
test_sample_data = [x for x in test_sample_data if not x.endswith('=NA')]
if evaldebug:
print("data in test sample:", str(test_sample_data))
classification = trainingDT.classify(root_node, test_sample_data)
solution_path = classification['solution_path']
del classification['solution_path']
which_classes = list( classification.keys() )
which_classes = sorted(which_classes, key=lambda x: classification[x], reverse=True)
most_likely_class_label = which_classes[0]
if evaldebug:
print("\nClassification:\n")
print(" " + str.ljust("class name", 30) + "probability")
print(" ---------- -----------")
for which_class in which_classes:
if which_class is not 'solution_path':
print(" " + str.ljust(which_class, 30) + str(classification[which_class]))
print("\nSolution path in the decision tree: " + str(solution_path))
print("\nNumber of nodes created: " + str(root_node.how_many_nodes()))
true_class_label_for_test_sample = self._samples_class_label_dict[test_sample_name]
if evaldebug:
print("%s: true_class: %s estimated_class: %s\n" % \
(test_sample_name, true_class_label_for_test_sample, most_likely_class_label))
confusion_matrix[true_class_label_for_test_sample][most_likely_class_label] += 1
print("\n\n DISPLAYING THE CONFUSION MATRIX FOR THE 10-FOLD CROSS-VALIDATION TEST:\n")
matrix_header = " " * 30
for class_name in self._class_names:
matrix_header += '{:^30}'.format(class_name)
print("\n" + matrix_header + "\n")
for row_class_name in sorted(confusion_matrix.keys()):
row_display = str.rjust(row_class_name, 30)
for col_class_name in sorted(confusion_matrix[row_class_name].keys()):
row_display += '{:^30}'.format(str(confusion_matrix[row_class_name][col_class_name]) )
print(row_display + "\n")
diagonal_sum, off_diagonal_sum = 0,0
for row_class_name in sorted(confusion_matrix.keys()):
for col_class_name in sorted(confusion_matrix[row_class_name].keys()):
if row_class_name == col_class_name:
diagonal_sum += confusion_matrix[row_class_name][col_class_name]
else:
off_diagonal_sum += confusion_matrix[row_class_name][col_class_name]
data_quality_index = 100.0 * diagonal_sum / (diagonal_sum + off_diagonal_sum)
print("\nTraining Data Quality Index: %s (out of a possible maximum of 100)" % data_quality_index)
if data_quality_index <= 80:
print( '''\nYour training data does not possess much class discriminatory '''
'''information. It could be that the classes are inherently not well '''
'''separable or that your constructor parameter choices are not appropriate.''')
elif 80 < data_quality_index <= 90:
print( '''\nYour training data possesses some class discriminatory information '''
'''but it may not be sufficient for real-world applications. You might '''
'''try tweaking the constructor parameters to see if that improves the '''
'''class discriminations.''')
elif 90 < data_quality_index <= 95:
print( '''\nYour training data appears to possess good class discriminatory '''
'''information. Whether or not it is acceptable would depend on your '''
'''application.''')
elif 95 < data_quality_index < 98:
print( '''\nYour training data is of very high quality.''')
else:
print('''\nYour training data is excellent.''')
#------------------------------ Generate Your Own Numeric Training Data -----------------------------
class TrainingDataGeneratorNumeric(object):
'''
See the example script generate_training_data_numeric.py on how to use this class
for generating your numeric training data. The training data is generator in
accordance with the specifications you place in a parameter file.
'''
def __init__(self, *args, **kwargs ):
if args:
raise SyntaxError('''TrainingDataGeneratorNumeric can only be called with keyword arguments '''
'''for the following keywords: output_csv_file, parameter_file, '''
'''number_of_samples_per_class, and debug''')
allowed_keys = 'output_csv_file','parameter_file','number_of_samples_per_class','debug'
keywords_used = kwargs.keys()
for keyword in keywords_used:
if keyword not in allowed_keys:
raise SyntaxError("Wrong keyword used --- check spelling")
output_csv_file = parameter_file = number_of_samples_per_class = debug = None
if 'output_csv_file' in kwargs : output_csv_file = kwargs.pop('output_csv_file')
if 'parameter_file' in kwargs : parameter_file = kwargs.pop('parameter_file')
if 'number_of_samples_per_class' in kwargs:
number_of_samples_per_class = kwargs.pop('number_of_samples_per_class')
if 'debug' in kwargs: debug = kwargs.pop('debug')
if output_csv_file:
self._output_csv_file = output_csv_file
else:
raise SyntaxError('''You must specify the name for a csv file for the training data''')
if parameter_file:
self._parameter_file = parameter_file
else:
raise SyntaxError('''You must specify a parameter file''')
if number_of_samples_per_class:
self._number_of_samples_per_class = number_of_samples_per_class
else:
raise SyntaxError('''You forgot to specify the number of training samples needed per class''')
if debug:
self._debug = debug
else:
self._debug = 0
self._class_names = []
self._features_ordered = []
self._class_names_and_priors = {}
self._features_with_value_range = {}
self._classes_and_their_param_values = {}
def read_parameter_file_numeric( self ):
'''
The training data generated by an instance of the class
TrainingDataGeneratorNumeric is based on the specs you place in a parameter
that you supply to the class constructor through a constructor variable
called `parameter_file. This method is for parsing the parameter file in
order to order to determine the names to be used for the different data
classes, their means, and their variances.
'''
class_names = []
class_names_and_priors = {}
features_with_value_range = {}
classes_and_their_param_values = {}
# regex8 = '[+-]?\ *(\d+(\.\d*)?|\.\d+)([eE][+-]?\d+)?';
FILE = open(self._parameter_file)
params = FILE.read()
regex = r'class names: ([\w\s+]+)\W*class priors: ([\d.\s+]+)'
classes = re.search(regex, params, re.DOTALL | re.IGNORECASE)
if (classes != None):
class_names = list( filter( None, classes.group(1).strip().split(' ') ) )
class_priors = list( filter( None, classes.group(2).strip().split(' ') ) )
class_names_and_priors = {class_names[i] : float(class_priors[i]) for i in range(len(class_names))}
if self._debug: print("\nClass names and priors: " + str(class_names_and_priors))
regex = r'feature name: \w*.*?value range: [\d\. -]+'
features = re.findall(regex, params, re.DOTALL | re.IGNORECASE)
regex = r'feature name: (\w+)\W*?value range:\s*([\d. -]+)'
features_ordered = []
for feature in features:
feature_groups = re.match(regex, feature, re.IGNORECASE)
feature_name = feature_groups.group(1)
features_ordered.append(feature_name)
value_range = feature_groups.group(2).split()
value_range = [float(value_range[0]), float(value_range[2])]
features_with_value_range[feature_name] = value_range
if self._debug: print("\nFeatures and their value ranges: "+ str(features_with_value_range))
classes_and_their_param_values = {class_names[i] : {} for i in range(len(class_names))}
regex = r'params for class:\s+\w*?\W+?mean:[\d\.\s+]+\W*?covariance:\W+?(?:[\s+\d.]+\W+?)+'
class_params = re.findall(regex, params, re.DOTALL | re.IGNORECASE)
regex = r'params for class:\s+(\w+)\W*?mean:\s*([\d. -]+)\W*covariance:\s*([\s\d.]+)'
for class_param in class_params:
class_params_groups = re.match(regex, class_param, re.IGNORECASE)
class_name = class_params_groups.group(1)
class_mean = class_params_groups.group(2).split()
class_mean = list(map(float, class_mean))
classes_and_their_param_values[class_name]['mean'] = class_mean
vector_size = len(class_mean)
class_param_string = class_params_groups.group(3)
covar_rows = filter(None, \
list(map(lambda x: x.strip().split(), class_param_string.splitlines())))
covar_matrix = []
for row in covar_rows:
row = list(map(float, row))
covar_matrix.append(row)
classes_and_their_param_values[class_name]['covariance'] = covar_matrix
if self._debug: print("\nThe class parameters are: "+ str(classes_and_their_param_values))
self._class_names = class_names
self._class_names_and_priors = class_names_and_priors
self._features_with_value_range = features_with_value_range
self._classes_and_their_param_values = classes_and_their_param_values
self._features_ordered = features_ordered
def gen_numeric_training_data_and_write_to_csv(self):
'''
After the parameter file is parsed by the previous method, this method calls
on `numpy.random.multivariate_normal()' to generate the training data
samples. Your training data can be of any number of of dimensions, can have
any mean, and any covariance.
'''
import numpy
import random
samples_for_class = {class_name : [] for class_name in self._class_names}
for class_name in self._classes_and_their_param_values:
mean = self._classes_and_their_param_values[class_name]['mean']
covariance = self._classes_and_their_param_values[class_name]['covariance']
samples = numpy.random.multivariate_normal(mean, covariance, self._number_of_samples_per_class)
samples = [map(float, map(lambda x: "%.3f" % x, list_of_samples)) for list_of_samples in samples]
samples_for_class[class_name] = samples
data_records = []
for class_name in samples_for_class:
for sample_index in range(self._number_of_samples_per_class):
data_record = class_name + "," + \
','.join(map(lambda x: str(x), samples_for_class[class_name][sample_index]))
if self._debug: print("data record: " + data_record)
data_records.append(data_record)
random.shuffle(data_records)
sample_records = []
sample_records.append('""' + ',' + 'class_name' + ',' + ','.join(self._features_ordered) + "\n")
for i in range(len(data_records)):
i1 = i+1
sample_records.append(str(i1) + ',' + data_records[i] +"\n")
FILE = open(self._output_csv_file, 'w')
list(map(FILE.write, sample_records))
FILE.close()
#------------------------ Generate Your Own Symbolic Training Data --------------------------------
class TrainingDataGeneratorSymbolic(object):
'''
See the sample script generate_training_data_symbolic.py for how to use this
class for generating symbolic training data. The data is generated according to
the specifications you place in a parameter file.
'''
def __init__(self, *args, **kwargs ):
if args:
raise SyntaxError('''TrainingDataGeneratorSymbolic can only be called with keyword arguments for the '''
'''following keywords: output_datafile, parameter_file, number_of_training_samples, '''
'''write_to_file, debug1, and debug2''')
allowed_keys = 'output_datafile','parameter_file','number_of_training_samples', 'write_to_file','debug1','debug2'
keywords_used = kwargs.keys()
for keyword in keywords_used:
if keyword not in allowed_keys:
raise SyntaxError("Wrong keyword used --- check spelling")
output_datafile = parameter_file = number_of_training_samples = None
write_to_file = debug1 = debug2 = None
if 'output_datafile' in kwargs: output_datafile = kwargs.pop('output_datafile')
if 'parameter_file' in kwargs: parameter_file = kwargs.pop('parameter_file')
if 'number_of_training_samples' in kwargs:
number_of_training_samples = kwargs.pop('number_of_training_samples')
if 'write_to_file' in kwargs : write_to_file = kwargs.pop('write_to_file')
if 'debug1' in kwargs: debug1 = kwargs.pop('debug1')
if 'debug2' in kwargs: debug2 = kwargs.pop('debug2')
if output_datafile:
self._output_datafile = output_datafile
else:
raise SyntaxError('''You must specify an output datafile''')
if parameter_file:
self._parameter_file = parameter_file
else:
raise SyntaxError('''You must specify a parameter file''')
if number_of_training_samples:
self._number_of_training_samples = number_of_training_samples
else:
raise SyntaxError('''You forgot to specify the number of training samples needed''')
if write_to_file:
self._write_to_file = write_to_file
else:
self._write_to_file = 0
if debug1:
self._debug1 = debug1
else:
self._debug1 = 0
if debug2:
self._debug2 = debug2
else:
self._debug2 = 0
self._training_sample_records = {}
self._features_and_values_dict = {}
self._bias_dict = {}
self._class_names = []
self._class_priors = []
def read_parameter_file_symbolic( self ):
'''
Read a parameter file for generating symbolic training data. See the script
generate_training_data_symbolic.py in the Examples directory for how to pass
the name of the parameter file to the constructor of the
TrainingDataGeneratorSymbolic class.
'''
debug1 = self._debug1
debug2 = self._debug2
write_to_file = self._write_to_file
number_of_training_samples = self._number_of_training_samples
input_parameter_file = self._parameter_file
all_params = []
param_string = ''
FILE = open(input_parameter_file, 'r')
all_params = FILE.read()
all_params = re.split(r'\n', all_params)
FILE.close()
pattern = r'^(?![ ]*#)'
try:
regex = re.compile( pattern )
except:
print("error in your pattern")
raise
all_params = list( filter( regex.search, all_params ) )
all_params = list( filter( None, all_params ) )
all_params = [x.rstrip('\n') for x in all_params]
param_string = ' '.join( all_params )
pattern = '^\s*class names:(.*?)\s*class priors:(.*?)(feature: .*)'
m = re.search( pattern, param_string )
rest_params = m.group(3)
self._class_names = list( filter(None, re.split(r'\s+', m.group(1))) )
self._class_priors = list( filter(None, re.split(r'\s+', m.group(2))) )
pattern = r'(feature:.*?) (bias:.*)'
m = re.search( pattern, rest_params )
feature_string = m.group(1)
bias_string = m.group(2)
features_and_values_dict = {}
features = list( filter( None, re.split( r'(feature[:])', feature_string ) ) )
for item in features:
if re.match(r'feature', item): continue
splits = list( filter(None, re.split(r' ', item)) )
for i in range(0, len(splits)):
if i == 0: features_and_values_dict[splits[0]] = []
else:
if re.match( r'values', splits[i] ): continue
features_and_values_dict[splits[0]].append( splits[i] )
self._features_and_values_dict = features_and_values_dict
bias_dict = {}
biases = list( filter(None, re.split(r'(bias[:]\s*class[:])', bias_string )) )
for item in biases:
if re.match(r'bias', item): continue
splits = list( filter(None, re.split(r' ', item)) )
feature_name = ''
for i in range(0, len(splits)):
if i == 0:
bias_dict[splits[0]] = {}
elif ( re.search( r'(^.+)[:]$', splits[i] ) ):
m = re.search( r'(^.+)[:]$', splits[i] )
feature_name = m.group(1)
bias_dict[splits[0]][feature_name] = []
else:
if not feature_name: continue
bias_dict[splits[0]][feature_name].append( splits[i] )
self._bias_dict = bias_dict
if self._debug1:
print("\n\n")
print("Class names: " + str(self._class_names))
print("\n")
num_of_classes = len(self._class_names)
print("Number of classes: " + str(num_of_classes))
print("\n")
print("Class priors: " + str(self._class_priors))
print("\n\n")
print("Here are the features and their possible values")
print("\n")
items = self._features_and_values_dict.items()
for item in items:
print(item[0] + " ===> " + str(item[1]))
print("\n")
print("Here is the biasing for each class:")
print("\n")
items = self._bias_dict.items()
for item in items:
print("\n")
print(item[0])
items2 = list( item[1].items() )
for i in range(0, len(items2)):
print( items2[i])
def gen_symbolic_training_data( self ):
'''
This method generates training data according to the specifications
placed in a parameter file that is read by the previous method.
'''
class_names = self._class_names
class_priors = self._class_priors
training_sample_records = {}
features_and_values_dict = self._features_and_values_dict
bias_dict = self._bias_dict
how_many_training_samples = self._number_of_training_samples
class_priors_to_unit_interval_map = {}
accumulated_interval = 0
for i in range(0, len(class_names)):
class_priors_to_unit_interval_map[class_names[i]] = \
(accumulated_interval, accumulated_interval+float(class_priors[i]))
accumulated_interval += float(class_priors[i])
if self._debug1:
print("Mapping of class priors to unit interval:")
print("\n")
items = class_priors_to_unit_interval_map.items()
for item in items:
print(item[0] + " ===> " + str(item[1]))
class_and_feature_based_value_priors_to_unit_interval_map = {}
for class_name in class_names:
class_and_feature_based_value_priors_to_unit_interval_map[class_name] = {}
for feature in features_and_values_dict:
class_and_feature_based_value_priors_to_unit_interval_map[class_name][feature] = {}
for class_name in class_names:
for feature in features_and_values_dict:
values = features_and_values_dict[feature]
if len(bias_dict[class_name][feature]) > 0:
bias_string = bias_dict[class_name][feature][0]
else:
no_bias = 1.0 / len(values)
bias_string = values[0] + "=" + str(no_bias)
value_priors_to_unit_interval_map = {}
splits = list( filter( None, re.split(r'\s*=\s*', bias_string) ) )
chosen_for_bias_value = splits[0]
chosen_bias = splits[1]
remaining_bias = 1 - float(chosen_bias)
remaining_portion_bias = remaining_bias / (len(values) -1)
accumulated = 0;
for i in range(0, len(values)):
if (values[i] == chosen_for_bias_value):
value_priors_to_unit_interval_map[values[i]] = \
[accumulated, accumulated + float(chosen_bias)]
accumulated += float(chosen_bias)
else:
value_priors_to_unit_interval_map[values[i]] = \
[accumulated, accumulated + remaining_portion_bias]
accumulated += remaining_portion_bias
class_and_feature_based_value_priors_to_unit_interval_map[class_name][feature] = \
value_priors_to_unit_interval_map
if self._debug2:
print("\n")
print( "For class " + class_name + \
": Mapping feature value priors for feature '" + \
feature + "' to unit interval: ")
print("\n")
items = value_priors_to_unit_interval_map.items()
for item in items:
print(" " + item[0] + " ===> " + str(item[1]))
ele_index = 0
while (ele_index < how_many_training_samples):
sample_name = "sample" + "_" + str(ele_index)
training_sample_records[sample_name] = []
# Generate class label for this training sample:
import random
ran = random.Random()
roll_the_dice = ran.randint(0,1000) / 1000.0
class_label = ''
for class_name in class_priors_to_unit_interval_map:
v = class_priors_to_unit_interval_map[class_name]
if ( (roll_the_dice >= v[0]) and (roll_the_dice <= v[1]) ):
training_sample_records[sample_name].append(
"class=" + class_name )
class_label = class_name
break
for feature in sorted(list(features_and_values_dict.keys())):
roll_the_dice = ran.randint(0,1000) / 1000.0
value_label = ''
value_priors_to_unit_interval_map = \
class_and_feature_based_value_priors_to_unit_interval_map[class_label][feature]
for value_name in value_priors_to_unit_interval_map:
v = value_priors_to_unit_interval_map[value_name]
if ( (roll_the_dice >= v[0]) and (roll_the_dice <= v[1]) ):
training_sample_records[sample_name].append( \
feature + "=" + value_name )
value_label = value_name;
break
ele_index += 1
self._training_sample_records = training_sample_records
if self._debug2:
print("\n\n")
print("TERMINAL DISPLAY OF TRAINING RECORDS:")
print("\n\n")
sample_names = training_sample_records.keys()
sample_names = sorted( sample_names, key=lambda x: int(x.lstrip('sample_')) )
for sample_name in sample_names:
print(sample_name + " = " + str(training_sample_records[sample_name]))
def write_training_data_to_file( self ):
features_and_values_dict = self._features_and_values_dict
class_names = self._class_names
output_file = self._output_datafile
training_sample_records = self._training_sample_records
FILE = open(self._output_datafile, 'w')
features = list( features_and_values_dict.keys() )
features.sort()
title_string = ',class'
for feature_name in features:
title_string += ',' + str(feature_name)
FILE.write(title_string + "\n")
sample_names = list( training_sample_records.keys() )
sample_names = sorted( sample_names, key=lambda x: int(x.lstrip('sample_')) )
record_string = ''
for sample_name in sample_names:
sample_name_string = str(sample_index(sample_name))
record_string += sample_name_string + ','
record = training_sample_records[sample_name]
item_parts_dict = {}
for item in record:
splits = list( filter(None, re.split(r'=', item)) )
item_parts_dict[splits[0]] = splits[1]
record_string += item_parts_dict["class"]
del item_parts_dict["class"]
kees = list(item_parts_dict.keys())
kees.sort()
for kee in kees:
record_string += "," + item_parts_dict[kee]
FILE.write(record_string + "\n")
record_string = ''
FILE.close()
#---------------------------------- Class DTIntrospection ----------------------------------
class DTIntrospection(object):
'''Instances constructed from this class can provide explanations for the
classification decisions at the nodes of a decision tree.
When used in the interactive mode, the decision-tree introspection made possible
by this class provides answers to the following three questions: (1) List of the
training samples that fall in the portion of the feature space that corresponds
to a node of the decision tree; (2) The probabilities associated with the last
feature test that led to the node; and (3) The class probabilities predicated on
just the last feature test on the path to that node.
CAVEAT: It is possible for a node to exist even when there are no training
samples in the portion of the feature space that corresponds to the node. That
is because a decision tree is based on the probability densities estimated from
the training data. When training data is non-uniformly distributed, it is
possible for the probability associated with a point in the feature space to be
non-zero even when there are no training samples at or in the vicinity of that
point.
For a node to exist even where there are no training samples in the portion of
the feature space that belongs to the node is an indication of the generalization
ability of decision-tree based classification.
When used in a non-interactive mode, an instance of this class can be used to
create a tabular display that shows what training samples belong directly to the
portion of the feature space that corresponds to each node of the decision tree.
An instance of this class can also construct a tabular display that shows how the
influence of each training sample propagates in the decision tree. For each
training sample, this display first shows the list of nodes that came into
existence through feature test(s) that used the data provided by that sample.
This list for each training sample is followed by a subtree of the nodes that owe
their existence indirectly to the training sample. A training sample influences a
node indirectly if the node is a descendant of another node that is affected
directly by the training sample.
'''
def __init__(self, dt):
if isinstance(dt, DecisionTree) is False:
raise TypeError("The argument supplied to the DTIntrospector constructor must be of type DecisionTree")
self._dt = dt
self._root_dtnode = dt._root_node
self._samples_at_nodes_dict = {}
self._branch_features_to_nodes_dict = {}
self._sample_to_node_mapping_direct_dict = {}
self._node_serial_num_to_node_dict = {}
self._awareness_raising_msg_shown = 0
self._debug = 0
def initialize(self):
if self._root_dtnode is None:
raise SyntaxError("You must first construct the decision tree before using the DT Introspection class.")
root_node = self._root_dtnode
self.recursive_descent(root_node)
def recursive_descent(self, node):
node_serial_number = node.get_serial_num()
self._node_serial_num_to_node_dict[node_serial_number] = node
branch_features_and_values_or_thresholds = node.get_branch_features_and_values_or_thresholds()
if self._debug:
print("\nat node " + str(node_serial_number) + ": the branch features and values are: " + str(branch_features_and_values_or_thresholds))
self._branch_features_to_nodes_dict[node_serial_number] = branch_features_and_values_or_thresholds
samples_at_node = None
for item in branch_features_and_values_or_thresholds:
samples_for_feature_value_combo = self.get_samples_for_feature_value_combo(item)
samples_at_node = samples_for_feature_value_combo if samples_at_node is None else \
[sample for sample in samples_at_node if sample in samples_for_feature_value_combo]
if (samples_at_node is not None) and (len(samples_at_node) > 0):
samples_at_node = sorted(samples_at_node, key = lambda x: sample_index(x))
if self._debug:
print("Node: " + str(node_serial_number) + " the samples are: " + str(samples_at_node))
self._samples_at_nodes_dict[node_serial_number] = samples_at_node
if samples_at_node is not None:
for sample in samples_at_node:
if sample not in self._sample_to_node_mapping_direct_dict:
self._sample_to_node_mapping_direct_dict[sample] = [node_serial_number]
else:
self._sample_to_node_mapping_direct_dict[sample].append(node_serial_number)
children = node.get_children()
list(map(lambda x: self.recursive_descent(x), children))
def display_training_samples_at_all_nodes_direct_influence_only(self):
if self._root_dtnode is None:
raise SyntaxError("You must first construct the decision tree before using the DT Introspection class.")
root_node = self._root_dtnode
self.recursive_descent_for_showing_samples_at_a_node(root_node)
def recursive_descent_for_showing_samples_at_a_node(self, node):
node_serial_number = node.get_serial_num()
branch_features_and_values_or_thresholds = node.get_branch_features_and_values_or_thresholds()
if node_serial_number in self._samples_at_nodes_dict:
if self._debug:
print("\nat node " + str(node_serial_number) + ": the branch features and values are: " + str(branch_features_and_values_or_thresholds))
print("Node " + str(node_serial_number) + ": the samples are: " + str(self._samples_at_nodes_dict[node_serial_number]))
children = node.get_children()
list(map(lambda x: self.recursive_descent_for_showing_samples_at_a_node(x), children))
def display_training_samples_to_nodes_influence_propagation(self):
for sample in sorted(self._dt._training_data_dict, key = lambda x: sample_index(x)):
if sample in self._sample_to_node_mapping_direct_dict:
nodes_directly_affected = self._sample_to_node_mapping_direct_dict[sample]
print("\n" + sample + ":\n" + " nodes affected directly: " + str(nodes_directly_affected))
print(" nodes affected through probabilistic generalization:")
list(map(lambda x: self.recursive_descent_for_sample_to_node_influence(x, nodes_directly_affected, " "), nodes_directly_affected))
def recursive_descent_for_sample_to_node_influence(self, node_serial_num, nodes_already_accounted_for, offset ):
offset += " "
node = self._node_serial_num_to_node_dict[node_serial_num]
children = [x.get_serial_num() for x in node.get_children()]
children_affected = [child for child in children if child not in nodes_already_accounted_for]
if len(children_affected) > 0:
print(offset + str(node_serial_num) + "=> " + str(children_affected))
list(map(lambda x: self.recursive_descent_for_sample_to_node_influence(x, children_affected, offset), children_affected))
def get_samples_for_feature_value_combo(self, feature_value_combo):
feature,op,value = self.extract_feature_op_val(feature_value_combo)
samples = []
if op == '=':
samples = [sample for sample in self._dt._training_data_dict if feature_value_combo in \
self._dt._training_data_dict[sample]]
elif op == '<':
for sample in self._dt._training_data_dict:
features_and_values = self._dt._training_data_dict[sample]
for item in features_and_values:
feature_data,op_data,val_data = self.extract_feature_op_val(item)
value = convert(value)
val_data = convert(val_data)
if isinstance(val_data, (int,float)) and (feature == feature_data) and (val_data <= value):
samples.append(sample)
break
elif op == '>':
for sample in self._dt._training_data_dict:
features_and_values = self._dt._training_data_dict[sample]
for item in features_and_values:
feature_data,op_data,val_data = self.extract_feature_op_val(item)
value = convert(value)
val_data = convert(val_data)
if isinstance(val_data, (int,float)) and (feature == feature_data) and (val_data > value):
samples.append(sample)
break
else:
raise SyntaxError("Something is wrong with the feature-value syntax")
return samples
def extract_feature_op_val(self, feature_value_combo):
pattern1 = r'(.+)=(.+)'
pattern2 = r'(.+)<(.+)'
pattern3 = r'(.+)>(.+)'
feature = value = op = None
if re.search(pattern2, feature_value_combo):
m = re.search(pattern2, feature_value_combo)
feature,op,value = m.group(1),'<',m.group(2)
elif re.search(pattern3, feature_value_combo):
m = re.search(pattern3, feature_value_combo)
feature,op,value = m.group(1),'>',m.group(2)
else:
m = re.search(pattern1, feature_value_combo)
feature,op,value = m.group(1),'=',m.group(2)
return (feature,op,value)
def explain_classifications_at_multiple_nodes_interactively(self):
if not self._samples_at_nodes_dict:
raise SyntaxError('''You called explain_classifications_at_multiple_nodes_interactively() without '''
'''first initializing the DTIntrospection instance in your code. Aborting.''')
print("\n\nIn order for the decision tree to introspect\n")
msg = '''DO YOU ACCEPT the fact that, in general, a region of the feature space
that corresponds to a node may have NON-ZERO probabilities associated
with it even when there are NO training data points in that region?
Enter 'y' for yes or any other character for no: '''
ans = None
if sys.version_info[0] == 3:
ans = input(msg)
else:
ans = raw_input(msg)
ans = ans.strip()
if (ans != 'y') and (ans != 'yes'):
raise Exception("\n Since you answered 'no' to a very real theoretical possibility, no explanations possible for the classification decisions in the decision tree. Aborting")
self._awareness_raising_msg_shown = 1
while True:
node_id = None
while True:
ans = None
if sys.version_info[0] == 3:
ans = input("\nEnter the integer ID of a node: ")
else:
ans = raw_input("\nEnter the integer ID of a node: ")
ans = ans.strip()
if ans == 'exit': return
try:
ans = int(ans)
except ValueError:
print("\nWhat you entered does not look like an integer ID for a node. Aborting!")
raise
if ans in self._samples_at_nodes_dict:
node_id = ans
break;
else:
print("\nYour answer must be an integer ID of a node. Try again or enter 'exit'.")
self.explain_classification_at_one_node(node_id)
def explain_classification_at_one_node(self, node_id):
if not self._samples_at_nodes_dict:
raise SyntaxError('''You called explain_classification_at_one_node() without first initializing '''
'''the DTIntrospection instance in your code. Aborting."''')
if node_id not in self._samples_at_nodes_dict:
print("Node %d is not a node in the tree" % node_id)
return
if node_id == 0:
print("Nothing useful to be explained at the root node")
return
if not self._awareness_raising_msg_shown:
print("\n\nIn order for the decision tree to introspect at Node %d: \n" % node_id)
msg = " DO YOU ACCEPT the fact that, in general, a region of the feature space\n" + \
" that corresponds to a DT node may have NON-ZERO probabilities associated\n" + \
" with it even when there are NO training data points in that region?\n" + \
"\nEnter 'y' for yes or any other character for no: "
ans = None
if sys.version_info[0] == 3:
ans = input(msg)
else:
ans = raw_input(msg)
ans = ans.strip()
if (ans != 'y') and (ans != 'yes'):
raise Exception("\n Since you answered 'no' to a very real theoretical possibility, no explanations possible for the classification decision at node %d" % node_id)
samples_at_node = self._samples_at_nodes_dict[node_id]
branch_features_to_node = self._branch_features_to_nodes_dict[node_id]
class_names = self._root_dtnode.get_class_names()
class_probabilities = self._root_dtnode.get_class_probabilities()
feature,op,value = self.extract_feature_op_val( branch_features_to_node[-1] )
if len(samples_at_node) > 0:
msg2 = "\n Samples in the portion of the feature space assigned to Node %d: %s\n" % (node_id, str(samples_at_node))
else:
msg2 = "\n\n There are NO training data samples directly in the region of the feature space assigned to node %d.\n" % node_id
msg2 += "\n Features tests on the branch to node %d: %s\n" % (node_id, str(branch_features_to_node))
msg2 += "\n Would you like to see the probability associated with the last feature test on the branch leading to Node %d\n?" % node_id
msg2 += "\n\n Enter 'y' if yes and `n' if no: "
ans = None
if sys.version_info[0] == 3:
ans = input(msg2)
else:
ans = raw_input(msg2)
ans = ans.strip()
if (ans == 'y') or (ans == 'yes'):
sequence = [ branch_features_to_node[-1] ]
prob = self._dt.probability_of_a_sequence_of_features_and_values_or_thresholds(sequence)
print("\n Probability of %s is: %s" % (str(sequence), str(prob)))
msg3 = "\n Using Bayes rule, would you like to see the class probabilities predicated on just the last feature test on the branch leading to Node %d\n?" % node_id
msg3 += "\n\n Enter 'y' if yes and `n' if no: "
ans = None
if sys.version_info[0] == 3:
ans = input(msg3)
else:
ans = raw_input(msg3)
ans = ans.strip()
if (ans == 'y') or (ans == 'yes'):
sequence = [ branch_features_to_node[-1] ]
for cls in class_names:
prob = self._dt.probability_of_a_class_given_sequence_of_features_and_values_or_thresholds(cls, sequence)
print("\n Probability of class %s given just one feature test %s is: %s" % (cls, str(sequence), str(prob)))
else:
print("goodbye")
print("\nFinished supplying information on Node %d\n\n" % node_id)
#------------------------------------ Test Code Follows -------------------------------------
if __name__ == '__main__':
dt = DecisionTree( training_datafile = "Examples/training_symbolic.csv",
csv_class_column_index = 1,
csv_columns_for_features = [2,3,4,5],
max_depth_desired = 5,
entropy_threshold = 0.1,
)
dt.get_training_data()
dt.show_training_data()
prob = dt.prior_probability_for_class( 'class=benign' )
print("prior for benign: ", prob)
prob = dt.prior_probability_for_class( 'class=malignant' )
print("prior for malignant: ", prob)
prob = dt.probability_of_feature_value( 'smoking', 'heavy')
print(prob)
dt.determine_data_condition()
root_node = dt.construct_decision_tree_classifier()
root_node.display_decision_tree(" ")
test_sample = ['exercising=never', 'smoking=heavy', 'fatIntake=heavy', 'videoAddiction=heavy']
classification = dt.classify(root_node, test_sample)
print("Classification: " + str(classification))
test_sample = ['videoAddiction=none', 'exercising=occasionally', 'smoking=never', 'fatIntake=medium']
classification = dt.classify(root_node, test_sample)
print("Classification: " + str(classification))
print("Number of nodes created: " + str(root_node.how_many_nodes()))