We’re going to continue playing around with our simple classification function. For those of you arriving late to the party, the function in question is:

$f(x) = \begin{cases} 0 & \mbox{if } xc+b\end{cases}$

We’re trying to apply that function to arrays of data as quickly as possible. In my last post, I focused on using Cython and C extensions to speed up the calculations. This post focuses on using an algorithmic optimization. I’m very glad to report that it appears not going down this road at my job was a good call.

%timeit [p_classify(x, c, b) for x in DATA]
100 loops, best of 3: 1.59 ms per loop

%timeit s_classify(s_list, c, b)
100 loops, best of 3: 1.01 ms per loop


Close to 40% speedup? Not bad for… more than a hundred lines of code, a few hours of work and a significant increase in code complexity. Like I said, vindicated. With p_classify, we are simply looping over all the data and applying our function, and s_classify is a function optimized to work with our skip list data structure. However, we’re not giving up after this result. More details at the end of the post, but you could soon be seeing a C++ compiled skip list class in Python. Yeah, I’m excited, too.

First, let’s run a quick line profile of our classify function to make sure that reducing the number of comparisons will do us some good. To bias the results in our favor, we’ll take some randomized data distributed so that observations are classified as 0 or 1 80% of the time. The profile looks like:

Line #      Hits  Time  Per Hit   % Time  Line Contents
==============================================================
2                                    @profile
3                                    def classify(x, c, b):
8      1000  4287      4.3     40.7    if x<c-b:
9       405   850      2.1      8.1      return 0
10       595  2233      3.8     21.2    elif x>c+b:
11       406   856      2.1      8.1      return 1
12                                      else:
13       189   422      2.2      4.0      if b>10**-7:
14       189  1877      9.9     17.8        return (xc+b)/2/b
15                                        else:
16                                          return 0.5


The comparisons end up being 60% of the total run time, so we’ll be able to get a real return if we play our optimization right. We’ll also be able to cut down the number of times this function has to return a value since we will have big blocks of ones and zeros. Somewhat surprising to me, the floating point operations don’t take a disproportionate amount of time. The floating point section is hit about 19% of the time, and it takes up about 22% of the actual time used. When we’re done, 90% of the time is going to be spent on the floating point section.

For more detail about skip lists, see the previous post and the links therein. We’re going to jump straight into our algorithm. Due to time constraints, I’m going to keep the discussion of the algorithm at a pretty high level. The first thing we need to do is to augment skip lists to allow us to calculate the number of elements less than or equal to a given value. Basically, what we are going to do is store the number of elements we skip over at each jump and add those up on our way down. Here is some quality ASCII art to illustrate my point:

             x --------- 3 ----- 2 - x
|           |       |   |
x - 1 ----- 2 - 1 - 1 - x
|   |       |   |   |   |
x - 1 - 1 - 1 - 1 - 1 - x


At each node, we count how many nodes over we are after each jump to the right. That number is stored in the variable accum, at each node, and we can find the rank of a number in our skip list:

def rank(self, value):
rank = 0
while value >= node.value:
if value < node.right.value:
if node.down:
node = node.down
else:
break
else:
node = node.right
rank += node.accum
return rank


All we are doing is moving along each list until we encounter a node containing a value greater than our argument and moving down. Once we can’t move down any more, we return our count of elements less than our argument. The hard part is maintaining the variable accum, but that doesn’t turn out to be all that hard:

def insert(self, value):
self.n_items += 1
while node.down:
if value < node.right.value:
node.right.accum += 1
node = node.down
else:
node = node.right
while value > node.right.value:
node = node.right
node(value)
node.right.accum = 1
self._raise_level(node.right)

def _raise_level(self, node, accum=None):
if accum is None:
accum = 1
if self._raise_check():
up_check = node.left
while up_check.up is None and up_check.value>float("-inf"):
accum += up_check.accum
up_check = up_check.left
if up_check.up:
place_node = up_check.up
place_node(node.value)
place_node.right.accum = accum
place_node.right.right.accum -= accum
node.up = place_node.right
place_node.right.down = node
self._raise_level(place_node.right, accum=accum)
else:
self._init_level()


If you want to go through the code line by line, I recommend heading over to GitHub. At a high level, we find where to insert the node exactly as before and then perform some indexing to make sure all the accum values continue to add up. Delete does basically the same thing in reverse. Test code is also included at GitHub to confirm everything is working properly. Finally, the method find_rank(m) returns a reference to the node with rank m.

That is all we need to implement our skip list classifier function:

def classify_list(rslist, c, b):
N = rslist.count()
zeros = rslist.rank(c-b)
ones = N-rslist.rank(c+b)

start = rslist.find_rank(zeros+1) if zeros > 0 else None
end = rslist.find_rank(N-ones+1) if ones > 0 else None

output = [_classify(val, c, b) for val in
rslist.iterate(start=start, stop=end)]
return [0]*zeros + output + [1]*ones


Because of the skip list structure, the four rank() calls are asymptotically as efficient as binary search, which is O(log N). The iterate() method returns a generator which loops over the base linked list in the usual way. Now, let’s look at how this performs line by line:

Line #      Hits     Time  Per Hit   % Time  Line Contents
==============================================================
12                                       #@profile
13                                       def classify_list(rslist, c, b):
14      1000     1694      1.7      0.1    N = rslist.count()
15      1000    79401     79.4      4.2    zeros = rslist.rank(c-b)
16      1000    81666     81.7      4.3    ones = N-rslist.rank(c+b)
17
18      1000    23593     23.6      1.3    start = rslist.find_rank(zeros+1) if zeros > 0 else None
19      1000    24351     24.4      1.3    end = rslist.find_rank(N-ones+1) if ones > 0 else None
20
21      1000      782      0.8      0.0    output = [_classify(val, c, b) for val in
22    181000  1654586      9.1     88.1    rslist.iterate(start=start, stop=end)]
23      1000    12238     12.2      0.7    return [0]*zeros + output + [1]*ones


We are now spending almost all of our time dealing with the middle section of classifier. The generator we return is significantly slower that iterating over a list, but we have greatly reduced the amount of time classifying 1s and 0s. Recalling our comparison with the Pythonic classifier:

%timeit [p_classify(x, c, b) for x in DATA]
100 loops, best of 3: 1.59 ms per loop

%timeit s_classify(s_list, c, b)
100 loops, best of 3: 1.01 ms per loop


The 40% reduction in run time is not much compared to the speedup seen by converting to a compiled version, but it certainly is nothing to scoff at. This would certainly make sense as an optimization to combine with using tools like Cython.

Now, for the teaser: What we really want is to compare our new algorithm with a Cython and a C++ implementation. The Cython implementation should be pretty straightforward, so look for that to come out first. However, I’ve been wanting to create a compiled class for quite a while. Why haven’t I? This. That’s a pretty steep learning curve for something I want to do for the hell of it. I recently learned about Boost.Python and how it can expose C++ classes to Python, and I’m pretty excited to try it out.