import copy import numpy import bisect import math def N_cube(L,K): if K > 0: return (2*K+1)**L - (2*K-1)**L #Whole volume minus interior = surface else: return 1 ##### PYRAMID CODING ##### #This section deals with Pyramid Vector Quantization. All algorithms # and variable names are taken from "A Pyramid Vector Quantizer" by # Thomas R. Fischer: # http://ieeexplore.ieee.org/search/srchabstract.jsp?arnumber=1057198&isnumber=22755&punumber=18&k2dockey=1057198@ieeejrns&query=(pyramid+vector+%3Cin%3E+metadata)+%3Cand%3E+(18+%3Cin%3E+punumber)&pos=2&access=no def N_pyramid(L,K): #Direct from the paper # This implementation runs in exponential time. With dynamic programming it # can be made to go in O(LK) (or even faster?). if K == 1: return 2*L elif L == 1: return 2 else: return N_pyramid(L-1, K) + N_pyramid(L-1, K-1) + N_pyramid(L, K-1) def N_pyramid_dynamic(L,K): store = Tabulate_N_pyramid(L,K) return store[L,K] def Tabulate_N_pyramid(L,K): #Tabulates all values of N(L,K) up to the requested values. #returns a table store = numpy.zeros((L+1,K+1), dtype=numpy.object) #use python's arbitrary-precision arithmetic store[:,0] = 1 #store[1,1:] = 2 #store[1:,1] = range(2,2*L+1,2) for i in xrange(1,L+1): for j in xrange(1,K+1): store[i,j] = store[i-1,j] + store[i-1,j-1] + store[i, j-1] return store def PVQ(x,K): """This function performs quantization on a real vector x with quantization "diameter" K. x need not be normalized in advance.""" s = sum(abs(xi) for xi in x) x_tilde = [xi*(K/s) for xi in x] d = sum(abs(xi) for xi in x_tilde) y_tilde = [int(round(xi)) for xi in x_tilde] err = [a-b for (a,b) in zip(y_tilde, x_tilde)] d = sum(abs(yi) for yi in y_tilde) if d != K: f = K - d if f < 0: #sum is too large, so find positive errors and dec them poserr = [] L = len(y_tilde) for i in range(L): if y_tilde[i]*err[i] > 0: poserr.append((abs(err[i]),cmp(err[i],0),i)) q = sorted(poserr) for (abserr, sgnerr, ind) in q[f:]: y_tilde[ind] -= sgnerr else: #sum is too small, so find negative errors and dec them negerr = [] L = len(y_tilde) for i in range(L): if y_tilde[i]*err[i] < 0: negerr.append((abs(err[i]),cmp(err[i],0),i)) q = sorted(negerr) for (abserr, sgnerr, ind) in q[-f:]: y_tilde[ind] -= sgnerr d = sum(abs(yi) for yi in y_tilde) return y_tilde def encode_pyramid(y): """Given a properly quantized input vector y, this function assigns it to a unique integer. (L and K are deduced from y.)""" b = 0 i = 0 k = sum(abs(yi) for yi in y) l = len(y) N = Tabulate_N_pyramid(l-1,k) while k > 0: v = abs(y[i]) sgn = cmp(y[i],0) if v == 1: b = b + N[l-1,k] + ((1-sgn)/2)*N[l-1,k-1] elif v > 1: b = b + N[l-1,k] + 2*sum(N[l-1,(k+1-v):k]) + ((1-sgn)/2)*N[l-1,k-v] k -= v l -= 1 i += 1 return b def decode_pyramid(b,L,K): """Inverse of encode_pyramid. Note that L and K cannot be deduced, so they must be supplied directly.""" x = numpy.zeros((L,),dtype=numpy.object) i = 0 xb = 0 k = K l = L N = Tabulate_N_pyramid(L-1,K) while k > 0: if b == xb: #(1) #x[i] = 0 print(i) x[L-1] = k - abs(x[i]) #(5) k = 0 elif b < xb + N[l-1,k]: #(2a) #x[i] = 0 k -= abs(x[i]) #(4) l -= 1 i += 1 else: xb += N[l-1,k] #(2b) #x[i] is not zero, so skip those possibilities j = 1 while b >= xb + 2*N[l-1,k-j]: #(3), polarity reversed print(b,xb,l,k,j) xb += 2*N[l-1,k-j] #(3b) j += 1 if b < xb + N[l-1,k-j]: #(3a) x[i] = j else: x[i] = -j #WARNING: the following line is not in the original PVQ paper. #However, I believe that it is an omission from the paper, # and correct here. xb += N[l-1,k-j] k -= abs(x[i]) #(4) l -= 1 i += 1 return x ##### OCTAGON CODING ###### #The following functions reproduce the corresponding PVQ functions, #repurposed for OVQ. When b=0, each of these functions should produce #identical output to the corresponding PVQ function. #Mathematically an "octagon" is parameterized by a and b. A vector # v in R^L is on the octagon iff sum(max(0,abs(elt)-b) for elt in v) == a # Clearly, then, if b==0, then sum(abs(elt) for elt in v) == a, which is a "pyramid". # If a ==0, then every element must be <=b, and we have a filled hypercube. def N_octagon_dynamic(L,a,b): store = Tabulate_N_octagon(L,a,b) return store[L,a] def Tabulate_N_octagon(L,a,b): store = numpy.zeros((L+1,a+1), dtype=numpy.object) #use python's arbitrary-precision arithmetic store[0,0] = 1 for i in xrange(1,L+1): store[i,0] = (2*b + 1)*store[i-1,0] for j in xrange(1,a+1): store[i,j] = store[i, j-1] + (2*b + 1)*store[i-1,j] + (1-2*b)*store[i-1,j-1] return store def encode_octagon(y,b): num = 0 i = 0 a = sum(max(0,abs(yi)-b) for yi in y) L = len(y) l = L N = Tabulate_N_octagon(l-1,a,b) while i < L: v = abs(y[i]) if v > 0: sgn = cmp(y[i],0) if v <= b: num += (2*v - 1 + (1-sgn)/2)*N[l-1,a] else: e = v - b num += (2*b + 1)*N[l-1,a] + 2*sum(N[l-1,(a+1-e):a]) + ((1-sgn)/2)*N[l-1,a-e] a -= e l -= 1 i += 1 return num def decode_octagon(num,L,a,b): x = numpy.zeros((L,),dtype=numpy.object) i = 0 xb = 0 l = L N = Tabulate_N_octagon(L-1,a,b) while i < L: if num == xb: #(1) #x[i] = 0 print(i) if a > 0: x[L-1] = b + a #(5) #else: # x[L-1] = 0 i = L #Terminate the loop elif num < xb + N[l-1,a]: #(2a) #x[i] = 0 l -= 1 i += 1 else: xb += N[l-1,a] #(2b) #x[i] is not zero, so skip those possibilities j = 1 e = max(0,j - b) while num >= xb + 2*N[l-1,a-e]: #(3), polarity reversed print(num,xb,l,a,j,e) xb += 2*N[l-1,a-e] #(3b) j += 1 e = max(0,j - b) if num < xb + N[l-1,a-e]: #(3a) x[i] = j else: x[i] = -j xb += N[l-1,a-e] #WARNING: This line is not in the paper! BUG!!! a -= e #(4) l -= 1 i += 1 return x def OVQ(x,a,b): L = len(x) y = sorted(map(abs,x),reverse=True) #exists solely to compute the scaling factor s = y[0] i = 1 #Derived as follows: compute the scale factor if the first i sum to a: # s/f - b*i = a -> f = (a + b*i)/s #If the (i+1)th element abs(y[i])/f <= b, then we are done. Otherwise, continue. while i < L and s*b < (a + b*i)*y[i]: s += y[i] i += 1 f = float(a + b*i)/s x_tilde = [xi*f for xi in x] #upper + lower == x_tilde #sum(map(abs,upper)) == a lower = [cmp(xi,0)*min(b,abs(xi)) for xi in x_tilde] lower_quantized = [int(round(xi)) for xi in lower] #scalar quantize if a > 0: upper = [cmp(xi,0)*max(0,abs(xi)-b) for xi in x_tilde] upper_quantized = PVQ(upper,a) total = map(sum,zip(upper_quantized, lower_quantized)) return total else: return lower_quantized ########## Octagon coding test functions ################ #The following functions are to be used for testing the above octagon #coding functions. def OVQ_errors_random(L,a,b,num_trials): """This function computes num_trials random vectors in L dimensions, quantizes them with OVQ (with parameters a and b), and returns a tuple consisting of the RMS error, the Mean Absolute Error, the maximum observed absolute error, and the vector associated with maximum error.""" sumsq = 0 sumabs = 0 maxerr = 0 maxvec = [] for i in xrange(num_trials): x = numpy.random.normal(size=(L,)) x = x / math.sqrt(numpy.dot(x,x)) y = OVQ(x,a,b) y = numpy.array(y,dtype=numpy.float64) y = y / math.sqrt(numpy.dot(y,y)) delta = y - x d2 = numpy.dot(delta,delta) sumsq += d2 d = math.sqrt(d2) sumabs += d if maxerr < d: maxerr = d maxvec = x return (math.sqrt(sumsq/num_trials),sumabs/num_trials,maxerr,maxvec) def octagon_radius_ratio(L,alpha,beta): outer = octagon_outer_radius(L,alpha,beta) inner = octagon_inner_radius(L,alpha,beta) return outer/inner def octagon_outer_radius(L,alpha,beta): return math.sqrt((alpha+beta)**2 + (L-1)*(beta**2)) def octagon_inner_radius(L,alpha,beta): inner = float('inf') for i in range(1,N+1): r2 = i*((beta + float(alpha)/i)**2) inner = min(inner,r2) return math.sqrt(inner) def octagon_max_error(L,alpha,beta): """This function computes the approximate maximum quantization error in L dimensions given parameters alpha and beta. (alpha and beta replace a and b here, to indicate that, in principle, this approximation applies to any real values of alpha and beta. For actual quantization, a and b are constrained to be integer.""" err = 0 # i is the number of coordinates that sum to a constant # when i==1, we are considering a face that is shared with a cube # when i==N, we are considering a face that is shared with a "pyramid" for i in range(1,L+1): #r2 is the minimum radius-squared for a "face" in which i # dimensions are constrained. r2 = i*((beta + float(alpha)/i)**2) #rect2 is squared max error possible on a uniform cubic lattice # in N-i dimensions. rect2 = float(L-i)/4 #pyr2 is the squared max error possible on a pyramid-face lattice #using i dimensions (i-1 degrees of freedom) #note that for i==1, pyr2==0 c = i//2 pyr2 = float((i*i*c)-(c*c*i))/(i*i) #max d**2 along the angled surface #err is the pythagorean sum of rect and pyr, divided by the scaling #necessary to project them onto the unit sphere thiserr = math.sqrt((rect2 + pyr2)/r2) err = max(err, thiserr) return err def optimum_b(N,K): minb = 0 minv = octagon_radius_ratio(N,K,0) for b in range(1,K+1): v = octagon_radius_ratio(N,K-b,b) if v < minv: minv = v minb = b return minb def recommend_ab(L,maxN,maxb=float('inf')): """This function scans through all possible values of a and b (b<=maxb if specified) that can be coded in log2(maxN) bits. It returns the choice of a and b that minimizes the octagon_max_error function. By setting maxb=0, one may get the largest PVQ for N <= maxN.""" b = -1 besterr = float('inf') bestb = -1 besta = 0 store = numpy.zeros((L+1,2), dtype=numpy.object) #use python's arbitrary-precision arithmetic a = 2 #dummy value while a > 1 and b < maxb: b += 1 print(b) a = 0 #the value of a corresponding to store[:,1] store[0,1] = 1 #actually L=0,a=0 for i in xrange(1,L+1): store[i,1] = (2*b + 1)*store[i-1,1] while a==0 or store[L,1] <= maxN: store[:,0] = store[:,1] store[0,1] = 0 for i in xrange(1,L+1): store[i,1] = store[i, 0] + (2*b + 1)*store[i-1,1] + (1-2*b)*store[i-1,0] a += 1 a -= 1 print(a,b,store[L,0]) if a == 0 and b == 0: return (0,0) elif a >= 0: curr_err = octagon_max_error(L,a,b) if curr_err < besterr: besterr = curr_err besta = a bestb = b return (besta,bestb) ######## EVERYTHING BELOW THIS POINT IS DEPRECATED. def N_hybrid(L,K,m): # m is the duality parameter. # When m=1, we have a hypercube. # When m=L, we have a dual-hypercube ("pyramid"). # The best approximation to a sphere occurs at m = sqrt(L) return _hyb(L,K,m,[]) #recursive helper function def _hyb(L,K,m,s): # s is the list (ideally the multiset) of values that have been seen so far # This function runs in exponential time (and slow!). if len(s) == L-1: #Terminate the recursion if sum(abs(x) for x in mu(m,s)) == K: # The pyramid constraint is already satisfied, so this final # dimension is (A) a "cube dimension" or (B) The last element of the pyramid and constrained to be exactly zero (Max_allowed = 0) return 2*Max_allowed(K,m,s) + 1 elif Max_allowed(K,m,s) == 0: # We're at some kind of corner or edge, and the final dimension # component is constrained to be exactly zero, so there is only 1 # option. return 1 else: # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be Max_allowed(K,m,s) or -Max_allowed(K,m,s). 2 options. return 2 else: tot = 0 for i in Allowed_range(K,m,s): c = copy.copy(s) c.append(i) tot += _hyb(L,K,m,c) return tot def Allowed_range(K,m,s): b = Max_allowed(K,m,s) return xrange(-b,b+1) def mu(m,s): if len(s) <= m: return s else: smu = sorted(s,key=abs) return smu[len(smu)-m:] def Max_allowed(K,m,s): return K - sum(abs(x) for x in mu(m-1,s)) def _hyb_dynamic(L,K,m,muabs,d,memo): if d == L-1: #Terminate the recursion if len(muabs) == m: if sum(muabs) == K: # The pyramid constraint is already satisfied, so this final # dimension is a "cube dimension". return 2*muabs[0] + 1 else: #sum(muabs) < K # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be B(K,m,s) or -B(K,m,s). 2 options. return 2 else: #len(muabs) == m-1 if sum(muabs) == K: # We're at some kind of corner or edge, and the final dimension # component is constrained to be exactly zero, so there is only 1 # option. return 1 else: #sum(muabs) < K # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be B(K,m,s) or -B(K,m,s). 2 options. return 2 else: tot = 0 b = K - sum(muabs[max(0,len(muabs) + 1 - m):]) for i in xrange(-b,b+1): t = list(muabs) t.append(abs(i)) t = tuple(sorted(t)[-m:]) #selects the m greatest elements, or all elements if there are fewer than m if (t,d+1) in memo: q = memo[(t,d+1)] else: q = _hyb_dynamic(L,K,m,t,d+1,memo) memo[(t,d+1)] = q tot += q return tot def _hyb_dynamic2(L,K,m,muabs,d,memo): #avoids hitting memo cache unnecessarily by a factor of 2. Saves about a factor of 2 in execution time. if d == L-1: #Terminate the recursion if len(muabs) == m: if sum(muabs) == K: # The pyramid constraint is already satisfied, so this final # dimension is a "cube dimension". return 2*muabs[0] + 1 else: #sum(muabs) < K # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be B(K,m,s) or -B(K,m,s). 2 options. return 2 else: #len(muabs) == m-1 if sum(muabs) == K: # We're at some kind of corner or edge, and the final dimension # component is constrained to be exactly zero, so there is only 1 # option. return 1 else: #sum(muabs) < K # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be B(K,m,s) or -B(K,m,s). 2 options. return 2 else: tot = 0 b = K - sum(muabs[max(0,len(muabs) + 1 - m):]) for i in xrange(0,b+1): t = list(muabs) t.append(i) t = tuple(sorted(t)[-m:]) #selects the m greatest elements, or all elements if there are fewer than m if (t,d+1) in memo: q = memo[(t,d+1)] else: q = _hyb_dynamic2(L,K,m,t,d+1,memo) memo[(t,d+1)] = q if i == 0: tot += q else: tot += 2*q return tot def _hyb_dynamic3((L,K,m,muabs,d),memo): #shrink L,K,m,muabs by subtracting elements that we know will not change henceforth. #print (L,K,m,muabs,d) if m == 1: #print("terminate, m==1") if len(muabs) > 0 and muabs[0] == K: # The face is already known, so all remaining parameters are free return (2*K + 1)**(L-d) # L-d parameters remain, each ranging from -K to K else: # the face has not yet been found return N_cube(L-d,K) elif K == 1: #Only one dimension can be nonzero if K=1 and m > 1. #print("terminate, k==1") if len(muabs) > 0 and muabs[-1] == 1: # The nonzero dimension has already been selected return 1 #All remaining entries must be 0 else: #The nonzero dimension has not yet been selected return 2*(L-d) #There are L-d dimensions remaining, and whichever one is chosen can be + or -. elif d == L-1: #Terminate the recursion, last dimension #print("terminate, d==L-1") if len(muabs) == m: if sum(muabs) == K: # The pyramid constraint is already satisfied, so this final # dimension is a "cube dimension". return 2*muabs[0] + 1 else: #sum(muabs) < K # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be B(K,m,s) or -B(K,m,s). 2 options. return 2 else: #len(muabs) == m-1 if sum(muabs) == K: # We're at some kind of corner or edge, and the final dimension # component is constrained to be exactly zero, so there is only 1 # option. return 1 else: #sum(muabs) < K # The pyramid constraint is not yet satisfied, so the value of this # dimension is constrained to be B(K,m,s) or -B(K,m,s). 2 options. return 2 elif sum(muabs) == K: #print("terminate, sum(muabs)==K") if len(muabs) < m: #The remainder of muabs will have to be filled with zeros return 1 else: #len(muabs) == m return (2*muabs[0] + 1)**(L-d) elif L == m: #We are in the "pyramid" case #print("transfer recursion, L==m") s = K - sum(muabs) if s > 0: return N_pyramid_dynamic(L-d,s) #Possible FIXME: should this be done internally to preserve memo? else: return 1 elif len(muabs) > 0 and muabs[-1]*m >= K: #muabs[-1] is a permanent member of muabs #print("recurse, muabs[-1]*m>=K") a = (L-1, K-muabs[-1], m-1, muabs[:-1], d-1) if a in memo: return memo[a] else: q = _hyb_dynamic3(a, memo) memo[a] = q return q else: #print("bruteforce") a = (L-1, K, m, muabs, d) #This handles the zero case if a in memo: tot = memo[a] else: tot = _hyb_dynamic3((L-1, K, m, muabs, d),memo) memo[a] = tot b = K - sum(muabs[max(0,len(muabs) + 1 - m):]) for i in xrange(1,b+1): #scan through the positive options t = list(muabs) bisect.insort(t,i) t = tuple(t[-m:]) #selects the m greatest elements, or all elements if there are fewer than m #t.append(i) #t = tuple(sorted(t)[-m:]) a = (L, K, m, t, d+1) if a in memo: q = memo[a] else: q = _hyb_dynamic3(a,memo) memo[a] = q tot += 2*q #we considered only the positive options, so double-count for + and - return tot def N_hybrid_dynamic(L,K,m): return _hyb_dynamic3((L,K,m,(),0),{}) #recursive helper function #N_hybrid_dynamic(20,20,18)