# Author: Guilherme Aldeia
# Contact: guilherme.aldeia@ufabc.edu.br
# Version: 1.0.9
# Last modified: 09-03-2021 by Guilherme Aldeia
"""Base class to be inherited for classification and regression tasks."""
import time
import warnings
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_random_state
from itea._manipulators.generator import uniform
from itea._manipulators.mutation import mutate_individual
from itea._manipulators.sanitizer import sanitize
from itea.inspection import ITExpr_explainer
import itea._manipulators.simplifier as simplifiers
[docs]class BaseITEA(BaseEstimator):
"""Base class to be inherited for classification and regression tasks.
This class implements argument checks and generic evolutionary methods
(population initialization, selection, mutation, and evolution), along with
three virtual methods to be implemented.
Ideally, this class should never be instantiated, only its derivations.
Its derivations will be scikit estimators and can be used in many scikit
methods such as grid-search or pipelines.
Every argument is a named argument. The list of arguments includes
everything that an ``ITExpr`` class needs to be instantiated.
All arguments have a default value. In this configuration, the
evolutionary process will search only for polynomials.
"""
def __init__(self, *,
gens = 100,
popsize = 100,
expolim = (-2, 2),
max_terms = 5,
simplify_method = None,
random_state = None,
verbose = None,
labels = [],
tfuncs = {'id': lambda x: x},
tfuncs_dx = None,
predictor_kw = None
):
"""Constructor method.
Parameters
----------
gens : int, default=100
number of generations of the evolutionary process. The
algorithm does not implement an early stop mechanism, so
it is guaranteed that the algorithm will perform the exact
number of generations.
popsize : int, default=100
population size, consistent through each generation.
expolim : tuple (int, int), default = (-2, 2)
tuple containing two integers, specifying the bounds
of exponents that can be explored through the evolution.
max_terms : int, default=5
the max number of IT terms allowed.
simplify_method : string or None, default=None
String with the name of the simplification method to be used
before fitting expressions through the evolutionary process.
When set to None, the simplification step is disabled.
Simplification can impact performance. To be simplified, the
expression must be previously fitted. After the simplification, if
the expression was changed, it should be fitted again to better
adjust the coefficients and intercept to the new IT expressions'
structure.
random_state : int, None or numpy.random_state, default=None
int or numpy random state. Use this argument
to have reproducible results across different
executions. When None, a random state instance
will be created and used and can be accessed
by ``itea.random_state``.
verbose : int, None or False, default=None
specify if the algorithm should perform the evolution
silently or if it should print information through the
process. When verbose is None, False, or 0, the algorithm
will not print any information. If verbose is an integer
``n``, then every ``n`` generations the algorithm will
print the status of the generation. If verbose is set
to -1, every generation information will be printed.
labels : list of strings, default=[]
(``ITExpr`` parameter) list containing the labels of the
data that will be used in the evolutionary process, and
will be used in ``ITExpr`` constructors.
tfuncs : dict, default={'id': lambda x: x}
(``ITExpr`` parameter) transformations functions to be
used when creating ``ITExpr`` 's during the
evolutionary process. Should always be a dict where the
keys are the names of the transformation functions and
the values are unary vectorized functions (for example,
numpy functions). For user-defined functions, see
numpy.vectorize for more information on how to vectorize
your transformation functions. Defaults to a dict with
only the identity function.
tfuncs_dx : dict, default=None
(ITExpr_explainer parameter) derivatives of the
given transformations functions, following the same scheme:
a dictionary where the key is the name of the function
(should have the derivatives of every function in
tfuncs) and the value is a vectorized function
representing its derivative. When set to None, the
itea package will use automatic differentiation
through jax to create the derivatives.
predictor_kw : dict or None, default = None
dictionary with parameters to pass as named arguments
to the constructor method in the ``BaseITExpr`` subclass.
If none is given, then a empty dict will be used.
"""
self.gens = gens
self.popsize = popsize
self.max_terms = max_terms
self.expolim = expolim
self.tfuncs = tfuncs
self.tfuncs_dx = tfuncs_dx
self.random_state = random_state
self.labels = labels
self.verbose = verbose
self.simplify_method = simplify_method
self.predictor_kw = predictor_kw
# This should always be none for the base class, so the default
# fitness function for each task (regression/classification) is
# correctly used.
self.fitness_f = None
def _check_args(self, X, y):
"""This method provides a simple verification of the arguments to be
used as a baseline.
The sub-classes of the BaseITEA should implement the check_args as well.
It is important to notice that the check must be made when fitting and
should raise errors to stop the program flow if any problem is found.
The scikit recomendation is to never do checks on __init__.
Raises
------
ValueError
If one or more arguments would result in a invalid execution of
itea.
Notes
-----
As the scikit documentation suggests, no check for valid arguments is
made on the constructor. Instead, when those arguments will be used,
we then perform the checkings. **All 'private' methods (beginning with
an underscore) are designed to work after the ``check_args`` is called,
since they rely on valid parameters. Also, all of them are intended
to internal usage. When modifying, calling them directly, or testing
the private methods, you should manually call the check args.
"""
if self.expolim[1] < self.expolim[0]:
raise ValueError(
"Lower expolim bound is greater than upper bound.")
if self.max_terms < 1:
raise ValueError("max_terms should be greater or equal to 1.")
for bound in self.expolim:
if not np.issubdtype(type(bound), int):
raise ValueError(
f"the expolim bounds {bound} must be integers.")
if not np.issubdtype(type(self.max_terms), int):
raise ValueError(f"max_terms should be a int.")
if self.simplify_method is not None:
if not self.simplify_method in simplifiers.__all__:
raise ValueError(
f"simplify_method {self.simplify_method} does not exist. "
f"Available methods: {simplifiers.__all__}")
if 'id' not in list(self.tfuncs.keys()):
warnings.warn("It is necessary to provide an identity function "
"with name 'id' on the tfuncs dict, and I didn't found it. I will "
"insert ``'id' : lambda x: x`` on the dict.")
self.tfuncs['id'] = lambda x: x
self.labels = np.array([self.labels]).flatten()
if len(self.labels) != len(X[0]):
warnings.warn("The labels vector does not have the same length as "
"the number of variables in X (or was not provided). labels "
f"has length {len(self.labels)}, and X has {len(X[0])} variables. "
"labels will be generated as [x_0, x_1, ...].")
self.labels = [f'x_{i}' for i in range(len(X[0]))]
if self.predictor_kw == None:
self.predictor_kw = {}
def _create_population(
self, *, simplify_f, nvars, itexpr_class, X, y, random_state):
"""Method to create an initial population for the evolutionary process.
It will use an random expression generator that does not create
trivial expressions (where all exponents are zero).
Although, if the user has chosen an simplification method, exists the
possibility that the initial population will have fewer individuals
than the given popsize. The while loop tries to guarantee that we will
start with a clean population where all fitnessess are finite values.
"""
generator = uniform(
self.max_terms, self.expolim, self.tfuncs, nvars, random_state)
# The loop below ensures that the first population is always
# composed of valud expressions with finite fitness.
pop = []
while(len(pop) < self.popsize):
expr = sanitize(next(generator))
itexpr = itexpr_class(
expr=sanitize(expr), tfuncs=self.tfuncs, labels=self.labels,
fitness_f = self.fitness_f, **self.predictor_kw
)
with np.errstate(all='ignore'):
itexpr.fit(X, y)
if simplify_f is not None:
itexpr = simplify_f(itexpr=itexpr, X=X)
itexpr.fit(X, y)
if np.isfinite(itexpr._fitness):
pop.append(itexpr)
return pop
def _mutate_population(self, *, pop, nvars, itexpr_class, random_state):
"""Method to mutate the population without changing its parents.
The mutated children will not be fitted. The fit of the ITExpr occurs
only when the selection method faces an unfitted ITExpr.
"""
mutated = [mutate_individual(p.expr, self.max_terms, self.expolim,
self.tfuncs, nvars, random_state) for p in pop]
newpop = [itexpr_class(
expr = sanitize(expr), tfuncs = self.tfuncs,
labels = self.labels,
fitness_f = self.fitness_f, **self.predictor_kw
) for expr in mutated]
return newpop
def _select_population(self, *,
pop, select_f, simplify_f, size, X, y, random_state):
"""Method to perform multiple tournament selections, until the number
of selected expressions is equal to the popsize.
After selection, all individuals in the returned population are fitted,
but not all individuals in the passed population will be as well.
"""
# Invalid expressions can happen. We'll ignore the warnings just here
with np.errstate(all='ignore'):
# Selecting the competitors indexes. Even if there is only 1
# individual in the population, the mutation will create a variation
# and the population passed as argument (which is assumed to be
# the concatenation of the original population and its mutated
# children) should have at least 2 individuals to compete.
competitors_idx = random_state.choice(len(pop), size=(size, 2))
# Finding the unique expression in population that were selected
# (to avoid unecessary fits)
to_fit = np.unique(competitors_idx)
# Simplify functions changes the expressions, we need to ensure
# they will be fitted after the process
if simplify_f is not None:
for i in to_fit:
pop[i] = simplify_f(itexpr=pop[i].fit(X, y), X=X)
for i in to_fit:
pop[i].fit(X, y)
return [select_f(comp) for comp in np.take(pop, competitors_idx)]
def _evolve(self, X, y, itexpr_class, greater_is_better):
"""Evolution process on an ITExpr population.
Should be used on sub-classes, inside the fit function, to evolve the
population.
"""
# Getting ready...
nvars = X.shape[1]
random_state = check_random_state(self.random_state)
# Takes an array of competitors and returns the most valuable to the
# task
if greater_is_better:
# making sure infinite fitness are negative
select_f = lambda comp: comp[np.argmax(
[c._fitness if np.isfinite(c._fitness) else -np.inf for c in comp])]
else: # smaller is better. By default invalid itexprs have +inf fitness
select_f = lambda comp: comp[np.argmin([c._fitness for c in comp])]
if self.simplify_method is not None:
simplify_f = getattr(simplifiers, self.simplify_method)
else:
simplify_f = None
groups = ['fitness', 'n_terms', 'complexity']
columns = ['min', 'mean', 'std', 'max']
self.convergence_ = {
group:{col:[] for col in columns} for group in groups}
# Starting the evolution now!
self.exectime_ = time.time()
pop = self._create_population(
simplify_f = simplify_f,
nvars = nvars,
itexpr_class = itexpr_class,
X = X,
y = y,
random_state = random_state)
if self.verbose:
print("gen | smallest fitness | mean fitness | highest fitness | "
"remaining time")
print("-"*76)
# Estimation of remaining time using a circular list
last_5_times = np.full(shape=(5), fill_value = np.nan, dtype=float)
for g in range(self.gens):
t = time.time()
child = self._mutate_population(
pop = pop,
nvars = nvars,
itexpr_class = itexpr_class,
random_state = random_state)
pop = self._select_population(
pop = pop + child,
size = self.popsize,
select_f = select_f,
simplify_f = simplify_f,
X = X,
y = y,
random_state = random_state)
# After selection, all individuals in population are fitted
# and have finite fitness
fitnesses = [p._fitness for p in pop]
n_terms = [p.n_terms for p in pop]
complexities = [p.complexity() for p in pop]
for group, data in zip(groups, [fitnesses, n_terms, complexities]):
data_masked = np.ma.masked_invalid(data)
self.convergence_[group]['min' ].append(np.min(data_masked))
self.convergence_[group]['max' ].append(np.max(data_masked))
self.convergence_[group]['mean'].append(np.mean(data_masked))
self.convergence_[group]['std' ].append(np.std(data_masked))
if (self.verbose and g%self.verbose==0) or self.verbose==-1:
# Estimating remaining time
last_5_times[g%5] = time.time() - t
remaining = int(np.ceil(
np.nanmean(last_5_times) * (self.gens - g - 1)))
remaining_str = f"{remaining//60}min{remaining % 60}sec"
print("{:3d} | {:16.6f} | {:12.6f} | {:15.6f} | {:12s}".format(
g, np.min(fitnesses), np.mean(fitnesses),
np.max(fitnesses), remaining_str
))
self.exectime_ = time.time() - self.exectime_
# At this point, all individuals in the population are fitted.
if greater_is_better:
# making sure infinite fitness are negative
return pop[np.argmax(
[p._fitness if np.isfinite(p._fitness) else -np.inf for p in pop])]
else: # smaller is better. By default invalid itexprs have +inf fitness
return pop[np.argmin([p._fitness for p in pop])]
def _explain_bestsol(self, itexpr, X, y):
"""Estimating feature importantes using the partial effect of the
final best solution.
After the evolution process, this method should be called to create
the feature_importances on the expression
"""
explainer = ITExpr_explainer(
itexpr=itexpr, tfuncs=self.tfuncs, tfuncs_dx=self.tfuncs_dx
).fit(X, y)
itexpr.selected_features_ = explainer.selected_features()
itexpr.feature_importances_ = explainer.average_partial_effects(X)
self.feature_importances_ = itexpr.feature_importances_
[docs] def fit(self, X, y):
"""virtual fit method. Should be overridden by sub-classes.
"""
# The subclasses must do:
# 1 - check_args
# 2 - run the evolution (with _evolve())
# 3 - retrieve the best solution
# 4 - calculate the feature importances of the best solution
raise NotImplementedError()
[docs] def predict(self, X):
"""virtual predict method. Should be overridden by sub-classes.
"""
raise NotImplementedError()
[docs] def predict_proba(self, X):
"""virtual predict_proba method. Should be overridden by sub-classes.
"""
raise NotImplementedError()