|
@@ -0,0 +1,525 @@
|
|
|
+import numpy as np
|
|
|
+try:
|
|
|
+ from scipy.stats import mode
|
|
|
+except ImportError:
|
|
|
+ def mode(a, axis=0):
|
|
|
+ scores = np.unique(np.ravel(a)) # get ALL unique values
|
|
|
+ testshape = list(a.shape)
|
|
|
+ testshape[axis] = 1
|
|
|
+ oldmostfreq = np.zeros(testshape)
|
|
|
+ oldcounts = np.zeros(testshape)
|
|
|
+
|
|
|
+ for score in scores:
|
|
|
+ template = (a == score)
|
|
|
+ counts = np.expand_dims(np.sum(template, axis), axis)
|
|
|
+ mostfrequent = np.where(counts > oldcounts, score, oldmostfreq)
|
|
|
+ oldcounts = np.maximum(counts, oldcounts)
|
|
|
+ oldmostfreq = mostfrequent
|
|
|
+
|
|
|
+ return mostfrequent, oldcounts
|
|
|
+import multiprocessing as mp
|
|
|
+from time import time
|
|
|
+from copy import deepcopy
|
|
|
+from crowd_labeling.logratio_transformations import \
|
|
|
+ centered_log_ratio_transform as clr, \
|
|
|
+ isometric_log_ratio_transform as ilr, \
|
|
|
+ additive_log_ratio_transform as alr, \
|
|
|
+ make_projection_matrix as mpm
|
|
|
+
|
|
|
+
|
|
|
+class CLLDA:
|
|
|
+ """
|
|
|
+ The :class: CLLDA is a python implementation of Crowd Labeling Latent Dirichlet Allocation.
|
|
|
+
|
|
|
+ This algorithm processes crowd labeling (aka crowd consensus) data where workers label instances
|
|
|
+ as pertaining to one or more classes. Allows for calculating resulting label estimates and
|
|
|
+ covariances in multiple log-ratio transformed spaces.
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, votes, workers, instances, vote_ids=None, worker_ids=None, instance_ids=None,
|
|
|
+ worker_prior=None, instance_prior=None, transform=None,
|
|
|
+ num_epochs=1000, burn_in=200, updateable=True, save_samples=False, seed=None):
|
|
|
+
|
|
|
+ """
|
|
|
+ Initializes settings for the model and automatically calls the inference function.
|
|
|
+
|
|
|
+ :param votes: List of vote values.
|
|
|
+ :param workers: List of uses who submitted :param votes.
|
|
|
+ :param instances: List of instances to which the :param votes pertain.
|
|
|
+ :param vote_ids: (optional) List of vote ids. If provided, :param votes should be a list of integers.
|
|
|
+ :param instance_ids: (optional) List of instance ids. If provided, :param instances should be a list of integers.
|
|
|
+ :param worker_ids: (optional) List of worker ids. If provided, :param workers should be a list of integers.
|
|
|
+ :param worker_prior: (optional) Matrix prior for worker skill (pseudovotes).
|
|
|
+ :param instance_prior: (optional) List of class priors (pseudovotes)
|
|
|
+ :param transform: log-ratio transform to use.
|
|
|
+ :param num_epochs: number of epochs to run for.
|
|
|
+ :param burn_in: number of epochs to ignore for convergence of the Gibbs chain.
|
|
|
+ :param updateable: If True, will save vote-classes between runs at the expense of memory.
|
|
|
+ :param save_samples: option to save vote-classes after each epoch (very memory intensive).
|
|
|
+ :param seed: seed for the random number generator if reproducibility is desired.
|
|
|
+ """
|
|
|
+
|
|
|
+ # set random seed
|
|
|
+ if seed is None:
|
|
|
+ seed = np.random.randint(int(1e8))
|
|
|
+ self.rng = np.random.RandomState(seed=seed)
|
|
|
+
|
|
|
+ # data info and priors
|
|
|
+ self.V = len(votes)
|
|
|
+ self.U = len(np.unique(workers))
|
|
|
+ self.I = len(np.unique(instances))
|
|
|
+ if worker_prior is not None:
|
|
|
+ self.worker_prior = np.array(worker_prior)
|
|
|
+ if self.worker_prior.ndim == 2:
|
|
|
+ self.worker_prior = np.tile(self.worker_prior[np.newaxis, :, :], [self.U, 1, 1])
|
|
|
+ self.C = self.worker_prior.shape[1]
|
|
|
+ self.R = self.worker_prior.shape[2]
|
|
|
+ else:
|
|
|
+ if vote_ids is not None:
|
|
|
+ self.C = len(vote_ids)
|
|
|
+ self.R = self.C
|
|
|
+ else:
|
|
|
+ self.R = len(np.unique(votes))
|
|
|
+ self.C = self.R
|
|
|
+ self.worker_prior = (np.eye(self.R) + np.ones((self.R, self.R)) / self.R) * 3
|
|
|
+ self.worker_prior = np.tile(self.worker_prior[np.newaxis, :, :], [self.U, 1, 1])
|
|
|
+ if instance_prior is None:
|
|
|
+ self.instance_prior = np.ones(self.C) / self.C / 4
|
|
|
+ else:
|
|
|
+ self.instance_prior = instance_prior
|
|
|
+
|
|
|
+ # determine vote IDs
|
|
|
+ if vote_ids is None:
|
|
|
+ self.vote_ids = np.unique(votes)
|
|
|
+ vote_dict = {y: x for x, y in enumerate(self.vote_ids)}
|
|
|
+ votes = np.array([vote_dict[x] for x in votes])
|
|
|
+ else:
|
|
|
+ self.vote_ids = vote_ids
|
|
|
+ # determine instance IDs
|
|
|
+ if instance_ids is None:
|
|
|
+ self.instance_ids = np.unique(instances)
|
|
|
+ instance_dict = {y: x for x, y in enumerate(self.instance_ids)}
|
|
|
+ instances = np.array([instance_dict[x] for x in instances])
|
|
|
+ else:
|
|
|
+ self.instance_ids = instance_ids
|
|
|
+ # determine worker IDs
|
|
|
+ if worker_ids is None:
|
|
|
+ self.worker_ids = np.unique(workers)
|
|
|
+ worker_dict = {y: x for x, y in enumerate(self.worker_ids)}
|
|
|
+ workers = np.array([worker_dict[x] for x in workers])
|
|
|
+ else:
|
|
|
+ self.worker_ids = worker_ids
|
|
|
+
|
|
|
+ # cl_transform info
|
|
|
+ if not isinstance(transform, str) and hasattr(transform, '__iter__'):
|
|
|
+ self.transform = tuple(transform)
|
|
|
+ else:
|
|
|
+ self.transform = (transform,)
|
|
|
+
|
|
|
+ # Gibbs sampling parameters
|
|
|
+ self.num_epochs = num_epochs
|
|
|
+ self.burn_in = burn_in
|
|
|
+ self.num_samples = num_epochs - burn_in
|
|
|
+
|
|
|
+ # info to save
|
|
|
+ self.LL = np.nan * np.ones(self.num_epochs)
|
|
|
+ self.worker_mats = np.zeros((self.U, self.C, self.R))
|
|
|
+ self.labels, self.labels_cov = list(), list()
|
|
|
+ for transform in self.transform:
|
|
|
+ if transform in (None, 'none', 'clr'):
|
|
|
+ self.labels.append(np.zeros((self.I, self.C)))
|
|
|
+ self.labels_cov.append(np.zeros((self.I, self.C, self.C)))
|
|
|
+ elif transform in ('alr', 'ilr'):
|
|
|
+ self.labels.append(np.zeros((self.I, self.C - 1)))
|
|
|
+ self.labels_cov.append(np.zeros((self.I, self.C - 1, self.C - 1)))
|
|
|
+ else:
|
|
|
+ raise Exception('Unknown transform!')
|
|
|
+ if save_samples:
|
|
|
+ self.samples = np.zeros((self.num_epochs - self.burn_in, self.I, self.C - 1))
|
|
|
+ self.updateable = updateable
|
|
|
+ self.vote_classes = None
|
|
|
+
|
|
|
+ # estimate label means and covariances using cllda
|
|
|
+ self.cllda(votes, workers, instances)
|
|
|
+
|
|
|
+ # clean up
|
|
|
+ if not self.updateable:
|
|
|
+ self.vote_classes = None
|
|
|
+ else:
|
|
|
+ self.votes = votes
|
|
|
+ self.instances = instances
|
|
|
+ self.workers = workers
|
|
|
+
|
|
|
+ # CLLDA optimization using Gibbs sampling
|
|
|
+ def cllda(self, votes, workers, instances, starting_epoch=0):
|
|
|
+ """
|
|
|
+ Performs inference on the :class: CLLDA model.
|
|
|
+
|
|
|
+ :param votes: List of vote values.
|
|
|
+ :param workers: List of workers who submitted :param votes.
|
|
|
+ :param instances: List of instances to which the :param votes pertain.
|
|
|
+ :param starting_epoch: How many epochs have already been incorporated in the averages.
|
|
|
+ """
|
|
|
+
|
|
|
+ # precalculate
|
|
|
+ worker_prior_sum = self.worker_prior.sum(axis=2)
|
|
|
+ instance_prior_sum = self.instance_prior.sum()
|
|
|
+
|
|
|
+ # initial estimates
|
|
|
+ if self.vote_classes is None:
|
|
|
+ if self.C == self.R:
|
|
|
+ self.vote_classes = votes.copy()
|
|
|
+ else:
|
|
|
+ self.vote_classes = self.rng.randint(0, self.C, self.V)
|
|
|
+
|
|
|
+ # calculate vote weights
|
|
|
+ temp = np.vstack((workers, instances)).T
|
|
|
+ temp = np.ascontiguousarray(temp).view(np.dtype((np.void, temp.dtype.itemsize * temp.shape[1])))
|
|
|
+ _, unique_counts = np.unique(temp, return_counts=True)
|
|
|
+ weights = 1. / unique_counts[instances] # type: np.ndarray
|
|
|
+
|
|
|
+ # initial counts
|
|
|
+ counts_across_images = np.zeros(shape=(self.U, self.C, self.R))
|
|
|
+ counts_across_workers_and_votes = np.zeros(shape=(self.I, self.C))
|
|
|
+ for it_v in range(self.V):
|
|
|
+ counts_across_images[workers[it_v], self.vote_classes[it_v], votes[it_v]] += weights[it_v]
|
|
|
+ counts_across_workers_and_votes[instances[it_v], self.vote_classes[it_v]] += weights[it_v]
|
|
|
+ counts_across_images_and_votes = counts_across_images.sum(axis=2)
|
|
|
+
|
|
|
+ # set cl_transform
|
|
|
+ transform = list()
|
|
|
+ for tfm in self.transform:
|
|
|
+ if tfm in (None, 'none'):
|
|
|
+ transform.append(self.identity)
|
|
|
+ elif tfm == 'clr':
|
|
|
+ transform.append(clr)
|
|
|
+ elif tfm == 'alr':
|
|
|
+ transform.append(alr)
|
|
|
+ elif tfm == 'ilr':
|
|
|
+ transform.append(lambda comp: ilr(comp, mpm(self.C)))
|
|
|
+
|
|
|
+ # LDA functions
|
|
|
+ def get_data_like():
|
|
|
+ like = np.zeros(self.V)
|
|
|
+ for it_v in range(self.V):
|
|
|
+ i = instances[it_v]
|
|
|
+ k = self.vote_classes[it_v]
|
|
|
+ u = workers[it_v]
|
|
|
+ v = votes[it_v]
|
|
|
+ w = weights[it_v] # type: np.ndarray
|
|
|
+ like[it_v] = (counts_across_workers_and_votes[i, k] - w + self.instance_prior[k]) \
|
|
|
+ * (counts_across_images[u, k, v] - w + self.worker_prior[u, k, v]) \
|
|
|
+ / (counts_across_images_and_votes[u, k] - w + worker_prior_sum[u, k])
|
|
|
+ return np.log(like).sum()
|
|
|
+
|
|
|
+ def get_label_prob():
|
|
|
+ like = (counts_across_workers_and_votes[i, :] + self.instance_prior[:]) \
|
|
|
+ * (counts_across_images[u, :, v] + self.worker_prior[u, :, v]) \
|
|
|
+ / (counts_across_images_and_votes[u, :] + worker_prior_sum[u, :])
|
|
|
+ return like / like.sum()
|
|
|
+
|
|
|
+ def update_labels():
|
|
|
+ # create update
|
|
|
+ numerator = counts_across_workers_and_votes + self.instance_prior
|
|
|
+ denominator = counts_across_workers_and_votes.sum(axis=1) + instance_prior_sum
|
|
|
+ update = numerator / denominator[:, np.newaxis]
|
|
|
+ for it, tfm in enumerate(transform):
|
|
|
+ tfmupdate = tfm(update)
|
|
|
+ if hasattr(self, 'samples'):
|
|
|
+ self.samples[ep - self.burn_in, :, :] = tfmupdate
|
|
|
+ # update labels
|
|
|
+ delta = (tfmupdate - self.labels[it]) / (ep - self.burn_in + 1)
|
|
|
+ self.labels[it] += delta
|
|
|
+ # update labels_M2
|
|
|
+ delta_cov = delta[:, :, np.newaxis] * delta[:, :, np.newaxis].transpose(0, 2, 1)
|
|
|
+ self.labels_cov[it] += (ep - self.burn_in) * delta_cov - self.labels_cov[it] / (ep - self.burn_in + 1)
|
|
|
+
|
|
|
+ def update_worker_mats():
|
|
|
+ # create update
|
|
|
+ numerator = counts_across_images + self.worker_prior
|
|
|
+ denominator = counts_across_images.sum(axis=2) + worker_prior_sum
|
|
|
+ update = numerator / denominator[:, :, np.newaxis]
|
|
|
+ # update labels
|
|
|
+ delta = (update - self.worker_mats) / (ep - self.burn_in + 1)
|
|
|
+ self.worker_mats += delta
|
|
|
+
|
|
|
+ # CLLDA
|
|
|
+ start = time()
|
|
|
+ for ep in range(starting_epoch, starting_epoch + self.num_epochs):
|
|
|
+ # begin epoch
|
|
|
+ print('starting epoch ' + str(ep + 1))
|
|
|
+ if ep > starting_epoch:
|
|
|
+ time_to_go = (time() - start) * (self.num_epochs - ep) / ep
|
|
|
+ if time_to_go >= 3600:
|
|
|
+ print('Estimated time to finish: %.2f hours' % (time_to_go / 3600,))
|
|
|
+ elif time_to_go >= 60:
|
|
|
+ print('Estimated time to finish: %.1f minutes' % (time_to_go / 60,))
|
|
|
+ else:
|
|
|
+ print('Estimated time to finish: %.1f seconds' % (time_to_go,))
|
|
|
+ ep_start = time()
|
|
|
+
|
|
|
+ # gibbs sampling
|
|
|
+ for it_v in self.rng.permutation(self.V).astype(np.int64):
|
|
|
+ # get correct indices
|
|
|
+ i = instances[it_v]
|
|
|
+ k = self.vote_classes[it_v]
|
|
|
+ u = workers[it_v]
|
|
|
+ v = votes[it_v]
|
|
|
+ w = weights[it_v]
|
|
|
+ # decrement counts
|
|
|
+ counts_across_images[u, k, v] -= w
|
|
|
+ counts_across_workers_and_votes[i, k] -= w
|
|
|
+ counts_across_images_and_votes[u, k] -= w
|
|
|
+ # calculate probabilities of labels for this vote
|
|
|
+ probs = get_label_prob()
|
|
|
+ # sample new label
|
|
|
+ k = self.rng.multinomial(1, probs).argmax()
|
|
|
+ self.vote_classes[it_v] = k
|
|
|
+ # increment counts
|
|
|
+ counts_across_images[u, k, v] += w
|
|
|
+ counts_across_workers_and_votes[i, k] += w
|
|
|
+ counts_across_images_and_votes[u, k] += w
|
|
|
+
|
|
|
+ # save information
|
|
|
+ self.LL[ep] = get_data_like()
|
|
|
+ if ep >= self.burn_in + starting_epoch:
|
|
|
+ update_labels()
|
|
|
+ update_worker_mats()
|
|
|
+
|
|
|
+ # print epoch LL and duration
|
|
|
+ print('Epoch completed in %.1f seconds' % (time() - ep_start,))
|
|
|
+ print('LL: %.6f' % (self.LL[ep]))
|
|
|
+
|
|
|
+ # adjust label covariances
|
|
|
+ self.labels_cov = [x * self.num_samples / (self.num_samples - 1.) for x in self.labels_cov]
|
|
|
+
|
|
|
+ time_total = time() - start
|
|
|
+ if time_total >= 3600:
|
|
|
+ print('CLLDA completed in %.2f hours' % (time_total / 3600,))
|
|
|
+ elif time_total >= 60:
|
|
|
+ print('CLLDA completed in %.1f minutes' % (time_total / 60,))
|
|
|
+ else:
|
|
|
+ print('CLLDA completed in %.1f seconds' % (time_total,))
|
|
|
+
|
|
|
+ #
|
|
|
+ def update(self, votes, workers, instances, vote_ids=None, instance_ids=None, worker_ids=None, worker_prior=None,
|
|
|
+ num_epochs=1000, burn_in=200):
|
|
|
+
|
|
|
+ # check that this is updateble
|
|
|
+ assert self.updateable, 'This model is not updateable, presumable to conserve memory.'
|
|
|
+
|
|
|
+ # determine IDs
|
|
|
+ # for votes
|
|
|
+ old_vote_ids = self.vote_ids.copy() # type: np.ndarray
|
|
|
+ if vote_ids is None:
|
|
|
+ self.vote_ids = np.unique(votes)
|
|
|
+ vote_dict = {y: x for x, y in enumerate(self.vote_ids)}
|
|
|
+ votes = np.array([vote_dict[x] for x in votes])
|
|
|
+ else:
|
|
|
+ self.vote_ids = vote_ids
|
|
|
+ # for instances
|
|
|
+ old_instance_ids = self.instance_ids.copy() # type: np.ndarray
|
|
|
+ if instance_ids is None:
|
|
|
+ self.instance_ids = np.unique(instances)
|
|
|
+ instance_dict = {y: x for x, y in enumerate(self.instance_ids)}
|
|
|
+ instances = np.array([instance_dict[x] for x in instances])
|
|
|
+ else:
|
|
|
+ self.instance_ids = instance_ids
|
|
|
+ # for workers
|
|
|
+ old_worker_ids = self.worker_ids.copy() # type: np.ndarray
|
|
|
+ if worker_ids is None:
|
|
|
+ self.worker_ids = np.unique(workers)
|
|
|
+ worker_dict = {y: x for x, y in enumerate(self.worker_ids)}
|
|
|
+ workers = np.array([worker_dict[x] for x in workers])
|
|
|
+ else:
|
|
|
+ self.worker_ids = worker_ids
|
|
|
+
|
|
|
+ # update parameters
|
|
|
+ self.V = len(votes)
|
|
|
+ self.U = len(np.unique(workers))
|
|
|
+ self.I = len(np.unique(instances))
|
|
|
+ self.num_epochs = num_epochs
|
|
|
+ self.burn_in = burn_in
|
|
|
+
|
|
|
+ # add more samples to previous solution
|
|
|
+ if np.array_equal(votes, self.votes) and np.array_equal(workers, self.workers) \
|
|
|
+ and np.array_equal(instances, self.instances) and np.array_equal(self.vote_ids, old_vote_ids) \
|
|
|
+ and np.array_equal(self.instance_ids, old_instance_ids) \
|
|
|
+ and np.array_equal(self.worker_ids, old_worker_ids):
|
|
|
+ # adjust label covariances
|
|
|
+ self.labels_cov = [x * (self.num_samples - 1.) / self.num_samples for x in self.labels_cov]
|
|
|
+
|
|
|
+ # update parameters
|
|
|
+ self.LL = np.concatenate((self.LL, np.zeros(num_epochs)))
|
|
|
+ old_num_samples = self.num_samples
|
|
|
+ self.num_samples += num_epochs - burn_in
|
|
|
+ self.votes = votes
|
|
|
+ self.workers = workers
|
|
|
+ self.instances = instances
|
|
|
+
|
|
|
+ # update cllda
|
|
|
+ self.cllda(votes, workers, instances, old_num_samples - 1)
|
|
|
+
|
|
|
+ # keep only vote-classes and build off of them
|
|
|
+ else:
|
|
|
+ # insert old vote-classes and initialize new vote-classes
|
|
|
+ old_vote_classes = self.vote_classes.copy()
|
|
|
+ self.vote_classes = np.zeros_like(votes)
|
|
|
+ old_dict = {y: x for x, y in enumerate(zip(self.votes, self.workers, self.instances))}
|
|
|
+ for it, index in enumerate(zip(votes, workers, instances)):
|
|
|
+ try:
|
|
|
+ self.vote_classes[it] = old_vote_classes[old_dict[index]]
|
|
|
+ except KeyError:
|
|
|
+ if self.C == self.R:
|
|
|
+ self.vote_classes[it] = votes[it]
|
|
|
+ else:
|
|
|
+ self.vote_classes[it] = self.rng.randint(self.C)
|
|
|
+
|
|
|
+ # adjust worker_prior if necessary
|
|
|
+ if not np.array_equal(self.worker_ids, old_worker_ids):
|
|
|
+ assert worker_prior is not None, "Worker priors must be provided if worker_ids change."
|
|
|
+ self.worker_prior = np.array(worker_prior)
|
|
|
+ if self.worker_prior.ndim == 2:
|
|
|
+ self.worker_prior = np.tile(self.worker_prior[np.newaxis, :, :], [self.U, 1, 1])
|
|
|
+
|
|
|
+ # adjust info to save
|
|
|
+ self.worker_mats = np.zeros((self.U, self.C, self.R))
|
|
|
+ self.labels, self.labels_cov = list(), list()
|
|
|
+ for transform in self.transform:
|
|
|
+ if transform in (None, 'none', 'clr'):
|
|
|
+ self.labels.append(np.zeros((self.I, self.C)))
|
|
|
+ self.labels_cov.append(np.zeros((self.I, self.C, self.C)))
|
|
|
+ elif transform in ('alr', 'ilr'):
|
|
|
+ self.labels.append(np.zeros((self.I, self.C - 1)))
|
|
|
+ self.labels_cov.append(np.zeros((self.I, self.C - 1, self.C - 1)))
|
|
|
+ else:
|
|
|
+ raise Exception('Unknown transform!')
|
|
|
+
|
|
|
+ # update parameters
|
|
|
+ self.LL = np.zeros(num_epochs)
|
|
|
+ self.num_samples = num_epochs
|
|
|
+
|
|
|
+ # update cllda
|
|
|
+ self.cllda(votes, workers, instances)
|
|
|
+ self.votes = votes
|
|
|
+ self.instances = instances
|
|
|
+ self.workers = workers
|
|
|
+
|
|
|
+ # no cl_transform
|
|
|
+ @staticmethod
|
|
|
+ def identity(compositional):
|
|
|
+ return compositional
|
|
|
+
|
|
|
+
|
|
|
+def concurrent_cllda(models, votes, workers, instances, nprocs=4, **kwargs):
|
|
|
+ """
|
|
|
+ Effortless parallelization of multiple CLLDA models.
|
|
|
+
|
|
|
+ :param models: If creating new models, an integer denoting how many models to create.
|
|
|
+ Otherwise, a list of existing models to update.
|
|
|
+ :param votes: List of vote values.
|
|
|
+ :param workers: List of uses who submitted :param votes.
|
|
|
+ :param instances: List of instances to which the :param votes pertain.
|
|
|
+ :param nprocs: Number of processors to use in the parallel pool.
|
|
|
+ :param kwargs: Other possible inputs to either CLLDA.__init__ or CLLDA.update
|
|
|
+ :return: List of new or updated CLLDA models.
|
|
|
+ """
|
|
|
+
|
|
|
+ # open parallel pool
|
|
|
+ print('Starting multiprocessing pool...')
|
|
|
+ pool = mp.Pool(processes=nprocs)
|
|
|
+
|
|
|
+ # run CL-LDA
|
|
|
+ if isinstance(models, int):
|
|
|
+ print('Starting new CL-LDA models in parallel...')
|
|
|
+ if 'seed' in kwargs.keys():
|
|
|
+ np.random.seed(kwargs['seed'])
|
|
|
+ kwargs = [deepcopy(kwargs) for x in range(models)]
|
|
|
+ for it in range(models):
|
|
|
+ kwargs[it]['seed'] = np.random.randint(int(1e8))
|
|
|
+ out = pool.map(_new_cllda, [(votes, workers, instances, kwa) for kwa in kwargs])
|
|
|
+
|
|
|
+ elif hasattr(models, '__iter__'):
|
|
|
+ print('Updating CL-LDA models in parallel...')
|
|
|
+ out = pool.map(_update_cllda, [(model, votes, workers, instances, kwargs) for model in models])
|
|
|
+
|
|
|
+ else:
|
|
|
+ pool.close()
|
|
|
+ TypeError('Unknown type for input: models.')
|
|
|
+
|
|
|
+ # close parallel pool
|
|
|
+ pool.close()
|
|
|
+ print('Multiprocessing pool closed.')
|
|
|
+
|
|
|
+ return out
|
|
|
+
|
|
|
+
|
|
|
+def combine_cllda(models):
|
|
|
+ """
|
|
|
+ Combine multiple CLLDA instances.
|
|
|
+ :param models: List of CLLDA models trained with the same settings.
|
|
|
+ :return: CLLDA model which combines the input models.
|
|
|
+ """
|
|
|
+
|
|
|
+ # check models are equivalent
|
|
|
+ assert np.equal(models[0].V, [model.V for model in models[1:]]).any(), 'Different number of votes!'
|
|
|
+ assert np.equal(models[0].U, [model.U for model in models[1:]]).any(), 'Different number of workers!'
|
|
|
+ assert np.equal(models[0].I, [model.I for model in models[1:]]).any(), 'Different number of instances!'
|
|
|
+ assert np.equal(models[0].C, [model.C for model in models[1:]]).any(), 'Different number of classes!'
|
|
|
+ assert np.equal(models[0].R, [model.R for model in models[1:]]).any(), 'Different number of responses!'
|
|
|
+ assert np.equal(models[0].worker_prior,
|
|
|
+ [model.worker_prior for model in models[1:]]).any(), 'Different worker priors!'
|
|
|
+ assert np.equal(models[0].instance_prior,
|
|
|
+ [model.instance_prior for model in models[1:]]).any(), 'Different instance priors!'
|
|
|
+ assert np.all([models[0].transform == model.transform for model in models[1:]]), 'Different transforms!'
|
|
|
+
|
|
|
+ # data info
|
|
|
+ out = deepcopy((models[0]))
|
|
|
+
|
|
|
+ # combine label estimates
|
|
|
+ out.num_samples = np.sum([model.num_samples for model in models])
|
|
|
+
|
|
|
+ # combine worker estimates
|
|
|
+ out.worker_mats = np.sum([model.worker_mats * model.num_samples for model in models],
|
|
|
+ axis=0) / out.num_samples
|
|
|
+
|
|
|
+ if all([x.updateable for x in models]):
|
|
|
+ out.vote_classes = mode(np.stack([x.vote_classes for x in models]))[0].flatten()
|
|
|
+
|
|
|
+ # combine labels and label covariances
|
|
|
+ for it in range(len(models[0].transform)):
|
|
|
+ out.labels[it] = np.sum([model.labels[it] * model.num_samples for model in models], 0) / out.num_samples
|
|
|
+ labels_corrmat = [(model.num_samples - 1.) / model.num_samples * model.labels_cov[it]
|
|
|
+ + model.labels[it][..., np.newaxis] * model.labels[it][..., np.newaxis].transpose(0, 2, 1)
|
|
|
+ for model in models]
|
|
|
+ out.labels_cov[it] = np.sum([corrmat * model.num_samples for model, corrmat in zip(models, labels_corrmat)],
|
|
|
+ 0) \
|
|
|
+ / out.num_samples - out.labels[it][..., np.newaxis] * out.labels[it][
|
|
|
+ ..., np.newaxis].transpose(0, 2, 1)
|
|
|
+
|
|
|
+ # adjust label covariances
|
|
|
+ out.labels_cov[it] *= out.num_samples / (out.num_samples - 1.)
|
|
|
+
|
|
|
+ return out
|
|
|
+
|
|
|
+
|
|
|
+# map function
|
|
|
+def _new_cllda(inputs):
|
|
|
+ return CLLDA(*inputs[:3], **inputs[3])
|
|
|
+
|
|
|
+
|
|
|
+# map function
|
|
|
+def _update_cllda(inputs):
|
|
|
+ inputs[0].update(*inputs[1:4], **inputs[4])
|
|
|
+ return inputs[0]
|
|
|
+
|
|
|
+
|
|
|
+# if __name__ == '__main__':
|
|
|
+# # test suite
|
|
|
+# from DS import DS
|
|
|
+# test_data = DS.test_data()
|
|
|
+# CLLDA(test_data[0], test_data[1], test_data[2], num_epochs=10, burn_in=2, transform=('none', 'alr', 'ilr', 'clr'))
|
|
|
+# cls = concurrent_cllda(4, test_data[0], test_data[1], test_data[2],
|
|
|
+# num_epochs=10, burn_in=2, transform=('none', 'alr', 'ilr', 'clr'))
|
|
|
+# cl = combine_cllda(cls)
|
|
|
+# a=1
|