123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- import numpy as np
- try:
- from scipy.stats import mode
- except ImportError:
- def mode(a, axis=0):
- scores = np.unique(np.ravel(a)) # get ALL unique values
- testshape = list(a.shape)
- testshape[axis] = 1
- oldmostfreq = np.zeros(testshape)
- oldcounts = np.zeros(testshape)
- for score in scores:
- template = (a == score)
- counts = np.expand_dims(np.sum(template, axis), axis)
- mostfrequent = np.where(counts > oldcounts, score, oldmostfreq)
- oldcounts = np.maximum(counts, oldcounts)
- oldmostfreq = mostfrequent
- return mostfrequent, oldcounts
- import multiprocessing as mp
- from time import time
- from copy import deepcopy
- from crowd_labeling.logratio_transformations import \
- centered_log_ratio_transform as clr, \
- isometric_log_ratio_transform as ilr, \
- additive_log_ratio_transform as alr, \
- make_projection_matrix as mpm
- class CLLDA:
- """
- The :class: CLLDA is a python implementation of Crowd Labeling Latent Dirichlet Allocation.
- This algorithm processes crowd labeling (aka crowd consensus) data where workers label instances
- as pertaining to one or more classes. Allows for calculating resulting label estimates and
- covariances in multiple log-ratio transformed spaces.
- """
- def __init__(self, votes, workers, instances, vote_ids=None, worker_ids=None, instance_ids=None,
- worker_prior=None, instance_prior=None, transform=None,
- num_epochs=1000, burn_in=200, updateable=True, save_samples=False, seed=None):
- """
- Initializes settings for the model and automatically calls the inference function.
- :param votes: List of vote values.
- :param workers: List of uses who submitted :param votes.
- :param instances: List of instances to which the :param votes pertain.
- :param vote_ids: (optional) List of vote ids. If provided, :param votes should be a list of integers.
- :param instance_ids: (optional) List of instance ids. If provided, :param instances should be a list of integers.
- :param worker_ids: (optional) List of worker ids. If provided, :param workers should be a list of integers.
- :param worker_prior: (optional) Matrix prior for worker skill (pseudovotes).
- :param instance_prior: (optional) List of class priors (pseudovotes)
- :param transform: log-ratio transform to use.
- :param num_epochs: number of epochs to run for.
- :param burn_in: number of epochs to ignore for convergence of the Gibbs chain.
- :param updateable: If True, will save vote-classes between runs at the expense of memory.
- :param save_samples: option to save vote-classes after each epoch (very memory intensive).
- :param seed: seed for the random number generator if reproducibility is desired.
- """
- # set random seed
- if seed is None:
- seed = np.random.randint(int(1e8))
- self.rng = np.random.RandomState(seed=seed)
- # data info and priors
- self.V = len(votes)
- self.U = len(np.unique(workers))
- self.I = len(np.unique(instances))
- if worker_prior is not None:
- self.worker_prior = np.array(worker_prior)
- if self.worker_prior.ndim == 2:
- self.worker_prior = np.tile(self.worker_prior[np.newaxis, :, :], [self.U, 1, 1])
- self.C = self.worker_prior.shape[1]
- self.R = self.worker_prior.shape[2]
- else:
- if vote_ids is not None:
- self.C = len(vote_ids)
- self.R = self.C
- else:
- self.R = len(np.unique(votes))
- self.C = self.R
- self.worker_prior = (np.eye(self.R) + np.ones((self.R, self.R)) / self.R) * 3
- self.worker_prior = np.tile(self.worker_prior[np.newaxis, :, :], [self.U, 1, 1])
- if instance_prior is None:
- self.instance_prior = np.ones(self.C) / self.C / 4
- else:
- self.instance_prior = instance_prior
- # determine vote IDs
- if vote_ids is None:
- self.vote_ids = np.unique(votes)
- vote_dict = {y: x for x, y in enumerate(self.vote_ids)}
- votes = np.array([vote_dict[x] for x in votes])
- else:
- self.vote_ids = vote_ids
- # determine instance IDs
- if instance_ids is None:
- self.instance_ids = np.unique(instances)
- instance_dict = {y: x for x, y in enumerate(self.instance_ids)}
- instances = np.array([instance_dict[x] for x in instances])
- else:
- self.instance_ids = instance_ids
- # determine worker IDs
- if worker_ids is None:
- self.worker_ids = np.unique(workers)
- worker_dict = {y: x for x, y in enumerate(self.worker_ids)}
- workers = np.array([worker_dict[x] for x in workers])
- else:
- self.worker_ids = worker_ids
- # cl_transform info
- if not isinstance(transform, str) and hasattr(transform, '__iter__'):
- self.transform = tuple(transform)
- else:
- self.transform = (transform,)
- # Gibbs sampling parameters
- self.num_epochs = num_epochs
- self.burn_in = burn_in
- self.num_samples = num_epochs - burn_in
- # info to save
- self.LL = np.nan * np.ones(self.num_epochs)
- self.worker_mats = np.zeros((self.U, self.C, self.R))
- self.labels, self.labels_cov = list(), list()
- for transform in self.transform:
- if transform in (None, 'none', 'clr'):
- self.labels.append(np.zeros((self.I, self.C)))
- self.labels_cov.append(np.zeros((self.I, self.C, self.C)))
- elif transform in ('alr', 'ilr'):
- self.labels.append(np.zeros((self.I, self.C - 1)))
- self.labels_cov.append(np.zeros((self.I, self.C - 1, self.C - 1)))
- else:
- raise Exception('Unknown transform!')
- if save_samples:
- self.samples = np.zeros((self.num_epochs - self.burn_in, self.I, self.C - 1))
- self.updateable = updateable
- self.vote_classes = None
- # estimate label means and covariances using cllda
- self.cllda(votes, workers, instances)
- # clean up
- if not self.updateable:
- self.vote_classes = None
- else:
- self.votes = votes
- self.instances = instances
- self.workers = workers
- # CLLDA optimization using Gibbs sampling
- def cllda(self, votes, workers, instances, starting_epoch=0):
- """
- Performs inference on the :class: CLLDA model.
- :param votes: List of vote values.
- :param workers: List of workers who submitted :param votes.
- :param instances: List of instances to which the :param votes pertain.
- :param starting_epoch: How many epochs have already been incorporated in the averages.
- """
- # precalculate
- worker_prior_sum = self.worker_prior.sum(axis=2)
- instance_prior_sum = self.instance_prior.sum()
- # initial estimates
- if self.vote_classes is None:
- if self.C == self.R:
- self.vote_classes = votes.copy()
- else:
- self.vote_classes = self.rng.randint(0, self.C, self.V)
- # calculate vote weights
- temp = np.vstack((workers, instances)).T
- temp = np.ascontiguousarray(temp).view(np.dtype((np.void, temp.dtype.itemsize * temp.shape[1])))
- _, unique_counts = np.unique(temp, return_counts=True)
- weights = 1. / unique_counts[instances] # type: np.ndarray
- # initial counts
- counts_across_images = np.zeros(shape=(self.U, self.C, self.R))
- counts_across_workers_and_votes = np.zeros(shape=(self.I, self.C))
- for it_v in range(self.V):
- counts_across_images[workers[it_v], self.vote_classes[it_v], votes[it_v]] += weights[it_v]
- counts_across_workers_and_votes[instances[it_v], self.vote_classes[it_v]] += weights[it_v]
- counts_across_images_and_votes = counts_across_images.sum(axis=2)
- # set cl_transform
- transform = list()
- for tfm in self.transform:
- if tfm in (None, 'none'):
- transform.append(self.identity)
- elif tfm == 'clr':
- transform.append(clr)
- elif tfm == 'alr':
- transform.append(alr)
- elif tfm == 'ilr':
- transform.append(lambda comp: ilr(comp, mpm(self.C)))
- # LDA functions
- def get_data_like():
- like = np.zeros(self.V)
- for it_v in range(self.V):
- i = instances[it_v]
- k = self.vote_classes[it_v]
- u = workers[it_v]
- v = votes[it_v]
- w = weights[it_v] # type: np.ndarray
- like[it_v] = (counts_across_workers_and_votes[i, k] - w + self.instance_prior[k]) \
- * (counts_across_images[u, k, v] - w + self.worker_prior[u, k, v]) \
- / (counts_across_images_and_votes[u, k] - w + worker_prior_sum[u, k])
- return np.log(like).sum()
- def get_label_prob():
- like = (counts_across_workers_and_votes[i, :] + self.instance_prior[:]) \
- * (counts_across_images[u, :, v] + self.worker_prior[u, :, v]) \
- / (counts_across_images_and_votes[u, :] + worker_prior_sum[u, :])
- return like / like.sum()
- def update_labels():
- # create update
- numerator = counts_across_workers_and_votes + self.instance_prior
- denominator = counts_across_workers_and_votes.sum(axis=1) + instance_prior_sum
- update = numerator / denominator[:, np.newaxis]
- for it, tfm in enumerate(transform):
- tfmupdate = tfm(update)
- if hasattr(self, 'samples'):
- self.samples[ep - self.burn_in, :, :] = tfmupdate
- # update labels
- delta = (tfmupdate - self.labels[it]) / (ep - self.burn_in + 1)
- self.labels[it] += delta
- # update labels_M2
- delta_cov = delta[:, :, np.newaxis] * delta[:, :, np.newaxis].transpose(0, 2, 1)
- self.labels_cov[it] += (ep - self.burn_in) * delta_cov - self.labels_cov[it] / (ep - self.burn_in + 1)
- def update_worker_mats():
- # create update
- numerator = counts_across_images + self.worker_prior
- denominator = counts_across_images.sum(axis=2) + worker_prior_sum
- update = numerator / denominator[:, :, np.newaxis]
- # update labels
- delta = (update - self.worker_mats) / (ep - self.burn_in + 1)
- self.worker_mats += delta
- # CLLDA
- start = time()
- for ep in range(starting_epoch, starting_epoch + self.num_epochs):
- # begin epoch
- print('starting epoch ' + str(ep + 1))
- if ep > starting_epoch:
- time_to_go = (time() - start) * (self.num_epochs - ep) / ep
- if time_to_go >= 3600:
- print('Estimated time to finish: %.2f hours' % (time_to_go / 3600,))
- elif time_to_go >= 60:
- print('Estimated time to finish: %.1f minutes' % (time_to_go / 60,))
- else:
- print('Estimated time to finish: %.1f seconds' % (time_to_go,))
- ep_start = time()
- # gibbs sampling
- for it_v in self.rng.permutation(self.V).astype(np.int64):
- # get correct indices
- i = instances[it_v]
- k = self.vote_classes[it_v]
- u = workers[it_v]
- v = votes[it_v]
- w = weights[it_v]
- # decrement counts
- counts_across_images[u, k, v] -= w
- counts_across_workers_and_votes[i, k] -= w
- counts_across_images_and_votes[u, k] -= w
- # calculate probabilities of labels for this vote
- probs = get_label_prob()
- # sample new label
- k = self.rng.multinomial(1, probs).argmax()
- self.vote_classes[it_v] = k
- # increment counts
- counts_across_images[u, k, v] += w
- counts_across_workers_and_votes[i, k] += w
- counts_across_images_and_votes[u, k] += w
- # save information
- self.LL[ep] = get_data_like()
- if ep >= self.burn_in + starting_epoch:
- update_labels()
- update_worker_mats()
- # print epoch LL and duration
- print('Epoch completed in %.1f seconds' % (time() - ep_start,))
- print('LL: %.6f' % (self.LL[ep]))
- # adjust label covariances
- self.labels_cov = [x * self.num_samples / (self.num_samples - 1.) for x in self.labels_cov]
- time_total = time() - start
- if time_total >= 3600:
- print('CLLDA completed in %.2f hours' % (time_total / 3600,))
- elif time_total >= 60:
- print('CLLDA completed in %.1f minutes' % (time_total / 60,))
- else:
- print('CLLDA completed in %.1f seconds' % (time_total,))
- #
- def update(self, votes, workers, instances, vote_ids=None, instance_ids=None, worker_ids=None, worker_prior=None,
- num_epochs=1000, burn_in=200):
- # check that this is updateble
- assert self.updateable, 'This model is not updateable, presumable to conserve memory.'
- # determine IDs
- # for votes
- old_vote_ids = self.vote_ids.copy() # type: np.ndarray
- if vote_ids is None:
- self.vote_ids = np.unique(votes)
- vote_dict = {y: x for x, y in enumerate(self.vote_ids)}
- votes = np.array([vote_dict[x] for x in votes])
- else:
- self.vote_ids = vote_ids
- # for instances
- old_instance_ids = self.instance_ids.copy() # type: np.ndarray
- if instance_ids is None:
- self.instance_ids = np.unique(instances)
- instance_dict = {y: x for x, y in enumerate(self.instance_ids)}
- instances = np.array([instance_dict[x] for x in instances])
- else:
- self.instance_ids = instance_ids
- # for workers
- old_worker_ids = self.worker_ids.copy() # type: np.ndarray
- if worker_ids is None:
- self.worker_ids = np.unique(workers)
- worker_dict = {y: x for x, y in enumerate(self.worker_ids)}
- workers = np.array([worker_dict[x] for x in workers])
- else:
- self.worker_ids = worker_ids
- # update parameters
- self.V = len(votes)
- self.U = len(np.unique(workers))
- self.I = len(np.unique(instances))
- self.num_epochs = num_epochs
- self.burn_in = burn_in
- # add more samples to previous solution
- if np.array_equal(votes, self.votes) and np.array_equal(workers, self.workers) \
- and np.array_equal(instances, self.instances) and np.array_equal(self.vote_ids, old_vote_ids) \
- and np.array_equal(self.instance_ids, old_instance_ids) \
- and np.array_equal(self.worker_ids, old_worker_ids):
- # adjust label covariances
- self.labels_cov = [x * (self.num_samples - 1.) / self.num_samples for x in self.labels_cov]
- # update parameters
- self.LL = np.concatenate((self.LL, np.zeros(num_epochs)))
- old_num_samples = self.num_samples
- self.num_samples += num_epochs - burn_in
- self.votes = votes
- self.workers = workers
- self.instances = instances
- # update cllda
- self.cllda(votes, workers, instances, old_num_samples - 1)
- # keep only vote-classes and build off of them
- else:
- # insert old vote-classes and initialize new vote-classes
- old_vote_classes = self.vote_classes.copy()
- self.vote_classes = np.zeros_like(votes)
- old_dict = {y: x for x, y in enumerate(zip(self.votes, self.workers, self.instances))}
- for it, index in enumerate(zip(votes, workers, instances)):
- try:
- self.vote_classes[it] = old_vote_classes[old_dict[index]]
- except KeyError:
- if self.C == self.R:
- self.vote_classes[it] = votes[it]
- else:
- self.vote_classes[it] = self.rng.randint(self.C)
- # adjust worker_prior if necessary
- if not np.array_equal(self.worker_ids, old_worker_ids):
- assert worker_prior is not None, "Worker priors must be provided if worker_ids change."
- self.worker_prior = np.array(worker_prior)
- if self.worker_prior.ndim == 2:
- self.worker_prior = np.tile(self.worker_prior[np.newaxis, :, :], [self.U, 1, 1])
- # adjust info to save
- self.worker_mats = np.zeros((self.U, self.C, self.R))
- self.labels, self.labels_cov = list(), list()
- for transform in self.transform:
- if transform in (None, 'none', 'clr'):
- self.labels.append(np.zeros((self.I, self.C)))
- self.labels_cov.append(np.zeros((self.I, self.C, self.C)))
- elif transform in ('alr', 'ilr'):
- self.labels.append(np.zeros((self.I, self.C - 1)))
- self.labels_cov.append(np.zeros((self.I, self.C - 1, self.C - 1)))
- else:
- raise Exception('Unknown transform!')
- # update parameters
- self.LL = np.zeros(num_epochs)
- self.num_samples = num_epochs
- # update cllda
- self.cllda(votes, workers, instances)
- self.votes = votes
- self.instances = instances
- self.workers = workers
- # no cl_transform
- @staticmethod
- def identity(compositional):
- return compositional
- def concurrent_cllda(models, votes, workers, instances, nprocs=4, **kwargs):
- """
- Effortless parallelization of multiple CLLDA models.
- :param models: If creating new models, an integer denoting how many models to create.
- Otherwise, a list of existing models to update.
- :param votes: List of vote values.
- :param workers: List of uses who submitted :param votes.
- :param instances: List of instances to which the :param votes pertain.
- :param nprocs: Number of processors to use in the parallel pool.
- :param kwargs: Other possible inputs to either CLLDA.__init__ or CLLDA.update
- :return: List of new or updated CLLDA models.
- """
- # open parallel pool
- print('Starting multiprocessing pool...')
- pool = mp.Pool(processes=nprocs)
- # run CL-LDA
- if isinstance(models, int):
- print('Starting new CL-LDA models in parallel...')
- if 'seed' in kwargs.keys():
- np.random.seed(kwargs['seed'])
- kwargs = [deepcopy(kwargs) for x in range(models)]
- for it in range(models):
- kwargs[it]['seed'] = np.random.randint(int(1e8))
- out = pool.map(_new_cllda, [(votes, workers, instances, kwa) for kwa in kwargs])
- elif hasattr(models, '__iter__'):
- print('Updating CL-LDA models in parallel...')
- out = pool.map(_update_cllda, [(model, votes, workers, instances, kwargs) for model in models])
- else:
- pool.close()
- TypeError('Unknown type for input: models.')
- # close parallel pool
- pool.close()
- print('Multiprocessing pool closed.')
- return out
- def combine_cllda(models):
- """
- Combine multiple CLLDA instances.
- :param models: List of CLLDA models trained with the same settings.
- :return: CLLDA model which combines the input models.
- """
- # check models are equivalent
- assert np.equal(models[0].V, [model.V for model in models[1:]]).any(), 'Different number of votes!'
- assert np.equal(models[0].U, [model.U for model in models[1:]]).any(), 'Different number of workers!'
- assert np.equal(models[0].I, [model.I for model in models[1:]]).any(), 'Different number of instances!'
- assert np.equal(models[0].C, [model.C for model in models[1:]]).any(), 'Different number of classes!'
- assert np.equal(models[0].R, [model.R for model in models[1:]]).any(), 'Different number of responses!'
- assert np.equal(models[0].worker_prior,
- [model.worker_prior for model in models[1:]]).any(), 'Different worker priors!'
- assert np.equal(models[0].instance_prior,
- [model.instance_prior for model in models[1:]]).any(), 'Different instance priors!'
- assert np.all([models[0].transform == model.transform for model in models[1:]]), 'Different transforms!'
- # data info
- out = deepcopy((models[0]))
- # combine label estimates
- out.num_samples = np.sum([model.num_samples for model in models])
- # combine worker estimates
- out.worker_mats = np.sum([model.worker_mats * model.num_samples for model in models],
- axis=0) / out.num_samples
- if all([x.updateable for x in models]):
- out.vote_classes = mode(np.stack([x.vote_classes for x in models]))[0].flatten()
- # combine labels and label covariances
- for it in range(len(models[0].transform)):
- out.labels[it] = np.sum([model.labels[it] * model.num_samples for model in models], 0) / out.num_samples
- labels_corrmat = [(model.num_samples - 1.) / model.num_samples * model.labels_cov[it]
- + model.labels[it][..., np.newaxis] * model.labels[it][..., np.newaxis].transpose(0, 2, 1)
- for model in models]
- out.labels_cov[it] = np.sum([corrmat * model.num_samples for model, corrmat in zip(models, labels_corrmat)],
- 0) \
- / out.num_samples - out.labels[it][..., np.newaxis] * out.labels[it][
- ..., np.newaxis].transpose(0, 2, 1)
- # adjust label covariances
- out.labels_cov[it] *= out.num_samples / (out.num_samples - 1.)
- return out
- # map function
- def _new_cllda(inputs):
- return CLLDA(*inputs[:3], **inputs[3])
- # map function
- def _update_cllda(inputs):
- inputs[0].update(*inputs[1:4], **inputs[4])
- return inputs[0]
- # if __name__ == '__main__':
- # # test suite
- # from DS import DS
- # test_data = DS.test_data()
- # CLLDA(test_data[0], test_data[1], test_data[2], num_epochs=10, burn_in=2, transform=('none', 'alr', 'ilr', 'clr'))
- # cls = concurrent_cllda(4, test_data[0], test_data[1], test_data[2],
- # num_epochs=10, burn_in=2, transform=('none', 'alr', 'ilr', 'clr'))
- # cl = combine_cllda(cls)
- # a=1
|