#revision october 9, 2019 Alden Bradford
import numpy as np
def w(x, axis = -1, threshold = None):
'''
Compute the fraction of variance not explained by a simple thresholding.
x is a numpy array holding the data on which W should be computed.
axis only works with its default value currently.
'''
sorted_x = np.sort(x, axis=axis)
varlist = explained_variance_list(sorted_x,axis=axis)
if threshold is None:
best_variance = varlist.max(axis = axis)
else:
index = (sorted_x < threshold).sum()
best_variance = varlist[index-1]
return 1 - best_variance/sorted_x.var(axis=axis)
def explained_variance_list(x, axis = -1):
'''
Find the amount of variance explained by thresholding at each of the possible
positions along x.
NOTE: x needs to be sorted.
'''
N = x.shape[axis]
truncated = np.take(x,range(N-1),axis=axis)
s1 = np.cumsum(truncated,axis = axis)
s2 = np.expand_dims(x.sum(axis=axis),axis=axis) - s1
n1 = np.arange(1,N)
n2 = N - n1
# I really would have liked to put it this way, because it is more readable:
# return (s1*n2-s2*n1)**2 /(n1*n2*N*N)
# however, this ends up as a significant bottleneck for memory.
# a better solution is the following in-place computation.
s1 *= n2
s2 *= n1
s1 -= s2
np.square(s1, out=s1)
s1 /= 1.0*n1*n2*N*N
return s1
def separate(data, direction, threshold_given = None):
'''
Give a boolean mask separating data by a computed threshold along direction. To facilitate comparison,
the first entry of the mask will always be True.
If the optional parameter threshold_given is provided, it is used instead of computing the best threshold.
'''
projected_data = np.inner(data,direction)
if threshold_given is None:
threshold_given = threshold(data, direction)
answer = projected_data <= threshold_given
if answer[0]:
return answer
else:
return ~answer
def threshold(data, direction):
'''
Find a suitable threshold value which maximizes explained variance of the data projected onto direction.
NOTE: the chosen hyperplane would be described mathematically as $ x \dot direction = threshold $.
'''
projected_data = np.inner(data,direction)
sorted_x = np.sort(projected_data)
best_sep_index = explained_variance_list(sorted_x).argmax()
return (sorted_x[best_sep_index]+sorted_x[best_sep_index+1])/2
def delta(scores, data, direction):
'''
Find the absolute difference of the average score in each cluster; used to validate data.
'''
mask = separate(data, direction)
m1 = scores[mask].mean()
m2 = scores[~mask].mean()
return abs(m1-m2)
def n_tarp(data, n, times=1):
'''
Perform n-TARP clustering on data, where the last axis of data is taken to be
the coordinates of each data point. Because all the relevant information about the generated clusters can
be concluded from just the direction of projection and the data, only the computed direction vectors are returned.
The optional parameter times is given as a vectorized way of performing n-tarp repeatedly.
This is good for testing purposes, but if a better projection is all that is desired then it would be better to
simply increase n.
'''
directions = np.random.randn(times, n, data.shape[-1])
projected = np.inner(directions, data)
ws = w(projected)
best_dir_index = np.argmin(ws, axis = -1)[...,np.newaxis,np.newaxis]
return np.take_along_axis(directions, best_dir_index, axis=1).squeeze()
#def validity_test(validation_data, threshold, direction, null_model_size=100_000):
# null_data = np.random.randn(null_model_size, validation_data.shape[0])
# null_w = w(null_data)
# projected = np.inner(validation_data,direction)
# alt_w = w(projected, threshold=threshold)
# p = (null_w < alt_w).mean() +1/null_model_size
# return {'p':p, 'w':alt_w}
#NOTE: the memoized version below works more quickly for repeated trials, because it takes a long time
#to generate random numbers.
_validity_test_memo = {}
def validity_test(validation_data, threshold, direction, null_model_size=100_000):
memo_key = (null_model_size, validation_data.shape[0])
if memo_key not in _validity_test_memo:
null_data = np.random.randn(null_model_size, validation_data.shape[0])
_validity_test_memo[memo_key] = w(null_data)
null_w = _validity_test_memo[memo_key]
projected = np.inner(validation_data,direction)
alt_w = w(projected, threshold=threshold)
p = (null_w < alt_w).mean() +1/null_model_size
return {'p':p, 'w':alt_w}