Skip to content

Classifying Our Skip Lists

March 26, 2013

We’re going to continue playing around with our simple classification function. For those of you arriving late to the party, the function in question is:

f(x) = \begin{cases} 0 & \mbox{if } x<c-b \\          \frac{x-c+b}{2b} & \mbox{if } c-b\leq x\leq c+b \\                              1 & \mbox{if } x>c+b\end{cases}

We’re trying to apply that function to arrays of data as quickly as possible. In my last post, I focused on using Cython and C extensions to speed up the calculations. This post focuses on using an algorithmic optimization. I’m very glad to report that it appears not going down this road at my job was a good call.

Let’s start with the ending:

%timeit [p_classify(x, c, b) for x in DATA]
100 loops, best of 3: 1.59 ms per loop

%timeit s_classify(s_list, c, b)
100 loops, best of 3: 1.01 ms per loop

Close to 40% speedup? Not bad for… more than a hundred lines of code, a few hours of work and a significant increase in code complexity. Like I said, vindicated. With p_classify, we are simply looping over all the data and applying our function, and s_classify is a function optimized to work with our skip list data structure. However, we’re not giving up after this result. More details at the end of the post, but you could soon be seeing a C++ compiled skip list class in Python. Yeah, I’m excited, too.

First, let’s run a quick line profile of our classify function to make sure that reducing the number of comparisons will do us some good. To bias the results in our favor, we’ll take some randomized data distributed so that observations are classified as 0 or 1 80% of the time. The profile looks like:

Line #      Hits  Time  Per Hit   % Time  Line Contents
==============================================================
     2                                    @profile
     3                                    def classify(x, c, b):
     8      1000  4287      4.3     40.7    if x<c-b:
     9       405   850      2.1      8.1      return 0
    10       595  2233      3.8     21.2    elif x>c+b:
    11       406   856      2.1      8.1      return 1
    12                                      else:
    13       189   422      2.2      4.0      if b>10**-7:
    14       189  1877      9.9     17.8        return (xc+b)/2/b
    15                                        else:
    16                                          return 0.5

The comparisons end up being 60% of the total run time, so we’ll be able to get a real return if we play our optimization right. We’ll also be able to cut down the number of times this function has to return a value since we will have big blocks of ones and zeros. Somewhat surprising to me, the floating point operations don’t take a disproportionate amount of time. The floating point section is hit about 19% of the time, and it takes up about 22% of the actual time used. When we’re done, 90% of the time is going to be spent on the floating point section.

For more detail about skip lists, see the previous post and the links therein. We’re going to jump straight into our algorithm. Due to time constraints, I’m going to keep the discussion of the algorithm at a pretty high level. The first thing we need to do is to augment skip lists to allow us to calculate the number of elements less than or equal to a given value. Basically, what we are going to do is store the number of elements we skip over at each jump and add those up on our way down. Here is some quality ASCII art to illustrate my point:

             x --------- 3 ----- 2 - x
             |           |       |   |
             x - 1 ----- 2 - 1 - 1 - x
             |   |       |   |   |   |
             x - 1 - 1 - 1 - 1 - 1 - x

At each node, we count how many nodes over we are after each jump to the right. That number is stored in the variable accum, at each node, and we can find the rank of a number in our skip list:

def rank(self, value):
  node = self.head
  rank = 0
  while value >= node.value:
    if value < node.right.value:
      if node.down:
        node = node.down
       else:
         break
    else:
      node = node.right
      rank += node.accum
  return rank

All we are doing is moving along each list until we encounter a node containing a value greater than our argument and moving down. Once we can’t move down any more, we return our count of elements less than our argument. The hard part is maintaining the variable accum, but that doesn’t turn out to be all that hard:

def insert(self, value):
  self.n_items += 1
  node = self.head
  while node.down:
    if value < node.right.value:
      node.right.accum += 1
      node = node.down
    else:
      node = node.right
  while value > node.right.value:
    node = node.right
  node(value)
  node.right.accum = 1
  self._raise_level(node.right)

def _raise_level(self, node, accum=None):
  if accum is None:
    accum = 1
  if self._raise_check():
    up_check = node.left
    while up_check.up is None and up_check.value>float("-inf"):
      accum += up_check.accum
      up_check = up_check.left
     if up_check.up:
       place_node = up_check.up
       place_node(node.value)
       place_node.right.accum = accum
       place_node.right.right.accum -= accum
       node.up = place_node.right
       place_node.right.down = node
       self._raise_level(place_node.right, accum=accum)
     else:
       self._init_level()
       self.head(node.value)
       self.head.right.accum = accum
       node.up = self.head.right
       self.head.right.down = node
       self._raise_level(self.head.right, accum=accum)

If you want to go through the code line by line, I recommend heading over to GitHub. At a high level, we find where to insert the node exactly as before and then perform some indexing to make sure all the accum values continue to add up. Delete does basically the same thing in reverse. Test code is also included at GitHub to confirm everything is working properly. Finally, the method find_rank(m) returns a reference to the node with rank m.

That is all we need to implement our skip list classifier function:

def classify_list(rslist, c, b):
  N = rslist.count()
  zeros = rslist.rank(c-b)
  ones = N-rslist.rank(c+b)

  start = rslist.find_rank(zeros+1) if zeros > 0 else None
  end = rslist.find_rank(N-ones+1) if ones > 0 else None

  output = [_classify(val, c, b) for val in 
            rslist.iterate(start=start, stop=end)]
  return [0]*zeros + output + [1]*ones

Because of the skip list structure, the four rank() calls are asymptotically as efficient as binary search, which is O(log N). The iterate() method returns a generator which loops over the base linked list in the usual way. Now, let’s look at how this performs line by line:

Line #      Hits     Time  Per Hit   % Time  Line Contents
==============================================================
    12                                       #@profile
    13                                       def classify_list(rslist, c, b):
    14      1000     1694      1.7      0.1    N = rslist.count()
    15      1000    79401     79.4      4.2    zeros = rslist.rank(c-b)
    16      1000    81666     81.7      4.3    ones = N-rslist.rank(c+b)
    17                                           
    18      1000    23593     23.6      1.3    start = rslist.find_rank(zeros+1) if zeros > 0 else None
    19      1000    24351     24.4      1.3    end = rslist.find_rank(N-ones+1) if ones > 0 else None
    20                                           
    21      1000      782      0.8      0.0    output = [_classify(val, c, b) for val in 
    22    181000  1654586      9.1     88.1    rslist.iterate(start=start, stop=end)]
    23      1000    12238     12.2      0.7    return [0]*zeros + output + [1]*ones

We are now spending almost all of our time dealing with the middle section of classifier. The generator we return is significantly slower that iterating over a list, but we have greatly reduced the amount of time classifying 1s and 0s. Recalling our comparison with the Pythonic classifier:

%timeit [p_classify(x, c, b) for x in DATA]
100 loops, best of 3: 1.59 ms per loop

%timeit s_classify(s_list, c, b)
100 loops, best of 3: 1.01 ms per loop

The 40% reduction in run time is not much compared to the speedup seen by converting to a compiled version, but it certainly is nothing to scoff at. This would certainly make sense as an optimization to combine with using tools like Cython.

Now, for the teaser: What we really want is to compare our new algorithm with a Cython and a C++ implementation. The Cython implementation should be pretty straightforward, so look for that to come out first. However, I’ve been wanting to create a compiled class for quite a while. Why haven’t I? This. That’s a pretty steep learning curve for something I want to do for the hell of it. I recently learned about Boost.Python and how it can expose C++ classes to Python, and I’m pretty excited to try it out.

Advertisements
Leave a Comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: