Source code for mgplvm.syndata.gen_data

import abc
from scipy.linalg import norm
import numpy as np
from typing import List
import matplotlib.pyplot as plt
import subprocess
import torch
from torch.distributions import Bernoulli


# %% base class
[docs]def draw_GP(n, d, n_samples, sig, ell, jitter=1e-6): '''draw RBF GP samples with (n*n_samples) x samples, ell = l (in units of samples)''' rep_ts = np.arange(n).reshape(-1, 1).repeat(n, 1) dts = rep_ts - rep_ts.T K = sig**2 * np.exp(-dts**2 / (2 * ell**2)) #nxn L = np.linalg.cholesky(K + jitter * np.eye(n)) #nxn us = np.random.normal(size=(n_samples, n, d)) #n_samples x n x d X = L @ us #n_samples x n x d (numpy deals with the shapes correctly here) return X
[docs]class Manif(metaclass=abc.ABCMeta): def __init__(self, d): self.d = d
[docs] @abc.abstractmethod def name(self): pass
[docs] @abc.abstractmethod def gen(self, n, n_samples, ell, sig): pass
[docs] @abc.abstractmethod def distance(self, x, y): pass
# %% Euclidean class
[docs]class Euclid(Manif): def __init__(self, d): super().__init__(d) @property def name(self): return "euclid(%i)" % self.d
[docs] def gen(self, n, n_samples, ell=None, sig=10, prefs=False): if ell is None: #gs = np.random.uniform(0, 4, size=(n_samples, n, self.d)) #gs = np.random.normal(2, 1, size=(n_samples, n, self.d)) if prefs: gs = np.random.uniform(-2.5, 2.5, size=(n_samples, n, self.d)) else: gs = np.random.normal(0, 1, size=(n_samples, n, self.d)) else: gs = draw_GP(n, self.d, n_samples, sig, ell) #gs = gs - np.mean(gs, axis=0) + 2 gs = gs - np.mean(gs, axis=0) return gs
[docs] def gen_ginit(self, n, n_samples): gs = np.zeros((n_samples, n, self.d)) return gs
[docs] def noisy_conds(self, gs, variability): gs = gs + np.random.normal(0, np.std(gs) * variability, size=gs.shape) return gs
[docs] def distance(self, x, y): return np.sum((x[:, :, None, ...] - y[:, None, ...])**2, axis=-1)
[docs]class Torus(Manif): def __init__(self, d): super().__init__(d) @property def name(self): return ("torus(d)" % self.d)
[docs] def norm(self, gs): return gs % (2 * np.pi)
[docs] def gen(self, n, n_samples, ell=None, sig=10, prefs=False): """if l is none, draw random samples - otherwise draw from an RBF GP with ell = l""" if ell is None: gs = np.random.uniform(0, 2 * np.pi, size=(n_samples, n, self.d)) else: gs = draw_GP(n, self.d, n_samples, sig, ell) gs = (gs + np.ceil(10 * sig) * 2 * np.pi) % ( 2 * np.pi) #put back on the torus return gs
[docs] def gen_ginit(self, n, n_samples): gs = np.ones((n_samples, n, self.d)) * np.pi return gs
[docs] def noisy_conds(self, gs, variability): gs = gs + np.random.normal(0, np.std(gs) * variability, size=gs.shape) return self.norm(gs)
[docs] def distance(self, x, y): return np.sum(2 - 2 * np.cos(x[:, :, None, ...] - y[:, None, ...]), axis=-1)
# %% Sphere class
[docs]class Sphere(Manif): def __init__(self, d): super().__init__(d) @property def name(self): return ("sphere(d)" % self.d)
[docs] def norm(self, gs): gs = gs / norm(gs, axis=1, keepdims=True) return gs
[docs] def noisy_conds(self, gs, variability): gs = gs + np.random.normal(0, np.std(gs) * variability, size=gs.shape) return self.norm(gs)
[docs] def gen(self, n, n_samples, ell=None, sig=None, prefs=False): '''generate random points in spherical space according to the prior''' gs = np.random.normal(0, 1, size=(n_samples, n, self.d + 1)) return self.norm(gs)
[docs] def gen_ginit(self, n, n_samples): gs = np.zeros((n_samples, n, self.d + 1)) gs[:, 0] = 1 return gs
[docs] def distance(self, x, y): 4 * (1 - np.sum(x[:, :, None, ...] * y[:, None, ...], axis=-1)**2) return 2 * (1 - np.sum(x[:, :, None, ...] * y[:, None, ...], axis=-1))
# %% SO(3) class
[docs]class So3(Manif): def __init__(self): super().__init__(3) @property def name(self): return "SO(3)"
[docs] def norm(self, gs): gs = gs / norm(gs, axis=1, keepdims=True) gs = gs * np.sign(gs[..., :1]) return gs
[docs] def gen(self, n, n_samples, ell=None, sig=None, prefs=False): '''generate random points in spherical space according to the prior''' gs = np.random.normal(0, 1, size=(n_samples, n, self.d + 1)) return self.norm(gs)
[docs] def gen_ginit(self, n, n_samples): gs = np.zeros((n_samples, n, self.d + 1)) gs[:, 0] = 1 return gs
[docs] def noisy_conds(self, gs, variability): gs = gs + np.random.normal(0, np.std(gs) * variability, size=gs.shape) return self.norm(gs)
[docs] def distance(self, x, y): ds = 4 * (1 - np.sum(x[:, :, None, ...] * y[:, None, ...], axis=-1)) return ds
# %% Product class
[docs]class Product(Manif): """ Does not support product of products at the moment """ def __init__(self, manifs): self.ds = [m.d for m in manifs] self.d = sum(self.ds) self.manifs = manifs @property def name(self): names = "x".join([m.name for m in self.manifs]) return names
[docs] def gen(self, n, n_samples, ell=None, sig=10, prefs=False): gs = [ m.gen(n, n_samples, ell=ell, sig=sig, prefs=prefs) for m in self.manifs ] return gs
[docs] def gen_ginit(self, n, n_samples): '''generate a series of points at the origin''' gs = [m.gen_ginit(n, n_samples) for m in self.manifs] return gs
[docs] def add_tangent_vector(self, gs, d, delta): gs = [ m.add_tangent_vector(g, d, delta) for (m, g) in zip(self.manifs, gs) ] return gs
[docs] def distance(self, xs, ys): return [m.distance(x, y) for (m, x, y) in zip(self.manifs, xs, ys)]
[docs] def distance_scaled(self, xs, ys, ls): return [ m.distance(x, y) / (l**2) for (m, x, y, l) in zip(self.manifs, xs, ys, ls) ]
# %% generator class
[docs]class Gen(): def __init__(self, manifold, n, m, l=0.5, alpha=1, beta=0.3, sigma=0.1, variability=0.1, n_samples=1): ''' initialize a data-generating object manifold is an instance of a manifold class or a list of manifolds ''' # make sure everything is in list format to allow for product kernels etc. if isinstance(manifold, Manif): manifold = [manifold] self.nman = len(manifold) self.manifold = Product(manifold) self.variability = variability self.n_samples = n_samples self.n, self.m = n, m self.gprefs = [] self.gs = [] self.params = {} for (param, value) in [('alpha', alpha), ('beta', beta), ('sigma', sigma), ('l', l)]: self.set_param(param, value)
[docs] def get_params(self): '''return parameters''' return self.params
[docs] def set_param(self, param, value): '''set the value of a parameter''' if param == "l": # need separate lengthscale per neuron per manifold if type(value) in [int, float, np.float64 ]: # one length scale for each manifold value = [value for i in range(self.nman)] value = [ np.ones((self.n, 1)) * val + np.random.normal(0, val * self.variability, size=(self.n, 1)) for val in value ] elif param == 'beta': # one param for each neuron value = np.random.uniform(0, value, size=(self.n, 1)) # uniform else: value = np.ones((self.n, 1))*value + \ np.random.normal(0, value*self.variability, size=(self.n, 1)) # save result self.params[param] = value
[docs] def gen_gprefs(self): '''generate prefered directions for each neuron''' self.gprefs = self.manifold.gen(self.n, self.n_samples, prefs=True) return self.gprefs
[docs] def gen_gconds(self, ell=None, sig=10): '''generate conditions for each neuron''' self.gs = self.manifold.gen(self.m, self.n_samples, ell=ell, sig=sig) return self.gs
[docs] def noisy_conds(self): ''' add noise to the conditions ''' gs = [ self.manifold.manifs[i].noisy_conds(self.gs[i], self.variability) for i in range(self.nman) ] return gs
[docs] def gen_data(self, gs_in=None, gprefs_in=None, mode='Gaussian', overwrite=True, sigma=None, ell=None, sig=10, rate=10): """ tbin is time of each time step (by default each time step is 1 ms) gs_in is optional input latent signal, otherwise random points on manifold generate Gaussian noise neural activities generate IPP spiking from Gaussian bump rate model rate is the mean peak firing rate across neurons """ # generate tuning curves if not overwrite: gprefs_backup, gs_backup = self.gprefs, self.gs ### Generating G according to p(G) if gs_in is None: if len(self.gs) == 0: self.gen_gconds(ell=ell, sig=sig) else: self.gs = gs_in if gprefs_in is None: if len(self.gprefs) == 0: self.gen_gprefs() else: self.gprefs = gprefs_in ### Generating according to p(F | G) ds_sqr = np.array( self.manifold.distance_scaled( self.gprefs, self.gs, self.params['l'])) # nman x n_samples x n x m Ks = np.exp(-0.5 * ds_sqr) # nman x n_samples x n x m K = np.prod(Ks, axis=0) # n_samples x n x m n_samples, n, m = K.shape fs = self.params['alpha'] * K + self.params['beta'] self.fs = fs #store denoised activities ### Generating according to p(Y | F) if mode == 'Gaussian': # add Gaussian noise sigma = self.params['sigma'] if sigma is None else sigma noise = np.random.normal( 0, np.repeat(np.repeat(sigma[None, ...], m, axis=-1), n_samples, axis=0)) self.Y = fs + noise elif mode == 'Poisson': # Poisson spiking max_activity = np.mean(np.amax(fs, axis=1)) #draw poisson samples self.Y = np.random.poisson(fs * rate / max_activity) else: raise NotImplementedError('Synthetic data type not supported.') if not overwrite: # reset self.gprefs, self.gs = gprefs_backup, gs_backup return self.Y
[docs] def get_data(self): return self.Y