Source code for Solid.ParticleSwarm

from abc import ABCMeta, abstractmethod
from random import random
from numpy import apply_along_axis, argmin, array, copy, diag_indices_from, dot, zeros
from numpy.random import uniform


[docs]class ParticleSwarm: """ Conducts particle swarm optimization """ __metaclass__ = ABCMeta swarm_size = None member_size = None lower_bound = None upper_bound = None pos = None vel = None scores = None best = None global_best = None c1 = None c2 = None c3 = None cur_steps = None max_steps = None min_objective = None def __init__(self, swarm_size, member_size, lower_bound, upper_bound, c1, c2, c3, max_steps, min_objective=None): """ :param swarm_size: number of members in swarm :param member_size: number of components per member vector :param lower_bound: list of lower bounds, where ith element is ith lower bound :param upper_bound: list of upper bounds, where ith element is ith upper bound :param c1: constant for 1st term in velocity calculation :param c2: contsant for 2nd term in velocity calculation :param c3: constant for 3rd term in velocity calculation :param max_steps: maximum steps to run algorithm for :param min_objective: objective function value to stop algorithm once reached """ if isinstance(swarm_size, int) and swarm_size > 0: self.swarm_size = swarm_size else: raise ValueError('Swarm size must be a positive integer') if isinstance(member_size, int) and member_size > 0: self.member_size = member_size else: raise ValueError('Member size must be a positive integer') if all([isinstance(x, (int, float)) for x in lower_bound]): self.lower_bound = array([float(x) for x in lower_bound]) else: raise ValueError('Lower bounds must be numeric types') if all([isinstance(x, (int, float)) for x in upper_bound]): self.upper_bound = array([float(x) for x in upper_bound]) else: raise ValueError('Upper bounds must be numeric types') self.pos = uniform(self.lower_bound, self.upper_bound, size=(swarm_size, member_size)) self.vel = uniform(self.lower_bound - self.upper_bound, self.upper_bound - self.lower_bound, size=(swarm_size, member_size)) self.best = copy(self.pos) if isinstance(c1, (int, float)) and isinstance(c2, (int, float)) and isinstance(c3, (int, float)): self.c1 = float(c1) self.c2 = float(c2) self.c3 = float(c3) else: raise ValueError() if isinstance(max_steps, int): self.max_steps = max_steps else: raise ValueError() if min_objective is not None: if isinstance(min_objective, (int, float)): self.min_objective = float(min_objective) else: raise ValueError() def __str__(self): return ('PARTICLE SWARM: \n' + 'CURRENT STEPS: %d \n' + 'BEST FITNESS: %f \n' + 'BEST MEMBER: %s \n\n') % \ (self.cur_steps, self._objective(self.global_best[0]), str(self.global_best[0])) def __repr__(self): return self.__str__() def _clear(self): """ Resets the variables that are altered on a per-run basis of the algorithm :return: None """ self.pos = uniform(self.lower_bound, self.upper_bound, size=(self.swarm_size, self.member_size)) self.vel = uniform(self.lower_bound - self.upper_bound, self.upper_bound - self.lower_bound, size=(self.swarm_size, self.member_size)) self.scores = self._score(self.pos) self.best = copy(self.pos) self.cur_steps = 0 self._global_best() @abstractmethod def _objective(self, member): """ Returns objective function value for a member of swarm - operates on 1D numpy array :param member: a member :return: objective function value of member """ pass def _score(self, pos): """ Applies objective function to all members of swarm :param pos: position matrix :return: score vector """ return apply_along_axis(self._objective, 1, pos) def _best(self, old, new): """ Finds the best objective function values for each member of swarm :param old: old values :param new: new values :return: None """ old_scores = self._score(old) new_scores = self._score(new) best = [] for i in range(len(old_scores)): if old_scores[i] < new_scores[i]: best.append(old[i]) else: best.append(new[i]) self.best = array(best) def _global_best(self): """ Finds the global best across swarm :return: None """ if self.global_best is None or min(self.scores) < self._objective(self.global_best[0]): self.global_best = array([self.pos[argmin(self.scores)],] * self.swarm_size)
[docs] def run(self, verbose=True): """ Conducts particle swarm optimization :param verbose: indicates whether or not to print progress regularly :return: best member of swarm and objective function value of best member of swarm """ self._clear() for i in range(self.max_steps): self.cur_steps += 1 if ((i + 1) % 100 == 0) and verbose: print self u1 = zeros((self.swarm_size, self.swarm_size)) u1[diag_indices_from(u1)] = [random() for x in range(self.swarm_size)] u2 = zeros((self.swarm_size, self.swarm_size)) u2[diag_indices_from(u2)] = [random() for x in range(self.swarm_size)] vel_new = (self.c1 * self.vel) + \ (self.c2 * dot(u1, (self.best - self.pos))) + \ (self.c3 * dot(u2, (self.global_best - self.pos))) pos_new = self.pos + vel_new self._best(self.pos, pos_new) self.pos = pos_new self.scores = self._score(self.pos) self._global_best() if self._objective(self.global_best[0]) < self.min_objective: print "TERMINATING - REACHED MINIMUM OBJECTIVE" return self.global_best[0], self._objective(self.global_best[0]) print "TERMINATING - REACHED MAXIMUM STEPS" return self.global_best[0], self._objective(self.global_best[0])