# Classifying Our Skip Lists

We’re going to continue playing around with our simple classification function. For those of you arriving late to the party, the function in question is:

We’re trying to apply that function to arrays of data as quickly as possible. In my last post, I focused on using Cython and C extensions to speed up the calculations. This post focuses on using an algorithmic optimization. I’m very glad to report that it appears not going down this road at my job was a good call.

Let’s start with the ending:

%timeit [p_classify(x, c, b) for x in DATA] 100 loops, best of 3: 1.59 ms per loop %timeit s_classify(s_list, c, b) 100 loops, best of 3: 1.01 ms per loop

Close to 40% speedup? Not bad for… more than a hundred lines of code, a few hours of work and a significant increase in code complexity. Like I said, vindicated. With p_classify, we are simply looping over all the data and applying our function, and s_classify is a function optimized to work with our skip list data structure. However, we’re not giving up after this result. More details at the end of the post, but you could soon be seeing a C++ compiled skip list class in Python. Yeah, I’m excited, too.

First, let’s run a quick line profile of our classify function to make sure that reducing the number of comparisons will do us some good. To bias the results in our favor, we’ll take some randomized data distributed so that observations are classified as 0 or 1 80% of the time. The profile looks like:

Line # Hits Time Per Hit % Time Line Contents ============================================================== 2 @profile 3 def classify(x, c, b): 8 1000 4287 4.3 40.7 if x<c-b: 9 405 850 2.1 8.1 return 0 10 595 2233 3.8 21.2 elif x>c+b: 11 406 856 2.1 8.1 return 1 12 else: 13 189 422 2.2 4.0 if b>10**-7: 14 189 1877 9.9 17.8 return (xc+b)/2/b 15 else: 16 return 0.5

The comparisons end up being 60% of the total run time, so we’ll be able to get a real return if we play our optimization right. We’ll also be able to cut down the number of times this function has to return a value since we will have big blocks of ones and zeros. Somewhat surprising to me, the floating point operations don’t take a disproportionate amount of time. The floating point section is hit about 19% of the time, and it takes up about 22% of the actual time used. When we’re done, 90% of the time is going to be spent on the floating point section.

For more detail about skip lists, see the previous post and the links therein. We’re going to jump straight into our algorithm. Due to time constraints, I’m going to keep the discussion of the algorithm at a pretty high level. The first thing we need to do is to augment skip lists to allow us to calculate the number of elements less than or equal to a given value. Basically, what we are going to do is store the number of elements we skip over at each jump and add those up on our way down. Here is some quality ASCII art to illustrate my point:

x --------- 3 ----- 2 - x | | | | x - 1 ----- 2 - 1 - 1 - x | | | | | | x - 1 - 1 - 1 - 1 - 1 - x

At each node, we count how many nodes over we are after each jump to the right. That number is stored in the variable accum, at each node, and we can find the rank of a number in our skip list:

def rank(self, value): node = self.head rank = 0 while value >= node.value: if value < node.right.value: if node.down: node = node.down else: break else: node = node.right rank += node.accum return rank

All we are doing is moving along each list until we encounter a node containing a value greater than our argument and moving down. Once we can’t move down any more, we return our count of elements less than our argument. The hard part is maintaining the variable accum, but that doesn’t turn out to be all that hard:

def insert(self, value): self.n_items += 1 node = self.head while node.down: if value < node.right.value: node.right.accum += 1 node = node.down else: node = node.right while value > node.right.value: node = node.right node(value) node.right.accum = 1 self._raise_level(node.right) def _raise_level(self, node, accum=None): if accum is None: accum = 1 if self._raise_check(): up_check = node.left while up_check.up is None and up_check.value>float("-inf"): accum += up_check.accum up_check = up_check.left if up_check.up: place_node = up_check.up place_node(node.value) place_node.right.accum = accum place_node.right.right.accum -= accum node.up = place_node.right place_node.right.down = node self._raise_level(place_node.right, accum=accum) else: self._init_level() self.head(node.value) self.head.right.accum = accum node.up = self.head.right self.head.right.down = node self._raise_level(self.head.right, accum=accum)

If you want to go through the code line by line, I recommend heading over to GitHub. At a high level, we find where to insert the node exactly as before and then perform some indexing to make sure all the accum values continue to add up. Delete does basically the same thing in reverse. Test code is also included at GitHub to confirm everything is working properly. Finally, the method find_rank(m) returns a reference to the node with rank m.

That is all we need to implement our skip list classifier function:

def classify_list(rslist, c, b): N = rslist.count() zeros = rslist.rank(c-b) ones = N-rslist.rank(c+b) start = rslist.find_rank(zeros+1) if zeros > 0 else None end = rslist.find_rank(N-ones+1) if ones > 0 else None output = [_classify(val, c, b) for val in rslist.iterate(start=start, stop=end)] return [0]*zeros + output + [1]*ones

Because of the skip list structure, the four rank() calls are asymptotically as efficient as binary search, which is O(log N). The iterate() method returns a generator which loops over the base linked list in the usual way. Now, let’s look at how this performs line by line:

Line # Hits Time Per Hit % Time Line Contents ============================================================== 12 #@profile 13 def classify_list(rslist, c, b): 14 1000 1694 1.7 0.1 N = rslist.count() 15 1000 79401 79.4 4.2 zeros = rslist.rank(c-b) 16 1000 81666 81.7 4.3 ones = N-rslist.rank(c+b) 17 18 1000 23593 23.6 1.3 start = rslist.find_rank(zeros+1) if zeros > 0 else None 19 1000 24351 24.4 1.3 end = rslist.find_rank(N-ones+1) if ones > 0 else None 20 21 1000 782 0.8 0.0 output = [_classify(val, c, b) for val in 22 181000 1654586 9.1 88.1 rslist.iterate(start=start, stop=end)] 23 1000 12238 12.2 0.7 return [0]*zeros + output + [1]*ones

We are now spending almost all of our time dealing with the middle section of classifier. The generator we return is significantly slower that iterating over a list, but we have greatly reduced the amount of time classifying 1s and 0s. Recalling our comparison with the Pythonic classifier:

%timeit [p_classify(x, c, b) for x in DATA] 100 loops, best of 3: 1.59 ms per loop %timeit s_classify(s_list, c, b) 100 loops, best of 3: 1.01 ms per loop

The 40% reduction in run time is not much compared to the speedup seen by converting to a compiled version, but it certainly is nothing to scoff at. This would certainly make sense as an optimization to combine with using tools like Cython.

Now, for the teaser: What we really want is to compare our new algorithm with a Cython and a C++ implementation. The Cython implementation should be pretty straightforward, so look for that to come out first. However, I’ve been wanting to create a compiled class for quite a while. Why haven’t I? This. That’s a pretty steep learning curve for something I want to do for the hell of it. I recently learned about Boost.Python and how it can expose C++ classes to Python, and I’m pretty excited to try it out.