Stream de-duplication

I’ve recently started playing around with NSQ, in my hunt for a good, resilient, highly available, guaranteed “at least once” delivery queue. That’s a lot of adjectives, but basically it boils down to a queue that puts a copy of messages on N nodes and is able to operate (without losing messages) with any X of them failing, obviously where X < N.

NSQ attacks this problem in an interesting way. Instead of trying to form a cluster (in the sense that say RabbitMQ does), it instead treats each `nsqd` instance as a separate entity. It is only the clients that know there is more than one of them, and the directory service `nsqlookupd`. This actually makes it very reliable, in the sense that there are no troublesome master/slave relationships to preserve or leaders to elect.

This simplicity forces some of the work back on the client.

  • NSQ is guaranteed “at least once”, rather than “exactly once”; hence subscribers should operate in an idempotent way
  • when using with replication, it is up to the client to de-duplicate the messages on subscription

Deduplication

To de-duplicate, a subscriber needs to determine if it has seen a message before. Doing so in an accurate way would involve storing all the message IDs or some digest of the message itself in a large hash table. With this we could simply test:

if (message is in hash map) {
    ignore
}
process

Then we just need to make sure we add messages seen to the hash map. With a lossless hash map (eg: store everything), this is going to use unbounded memory.

The opposite of a Bloom filter

Bloom filters were my first thought when trying to come up with a way of bounding memory. Bloom filters are a probabilistic data structure that is able to test if some element is a member of a set. A Bloom filter will never tell you an item is in the set if it isn’t (no false negatives), but may tell you it is in the set when really it isn’t (chance of false positives).

What I actually want is _the opposite_ of a Bloom filter.

http://lmgtfy.com/?q=opposite+of+a+bloom+filter

So picking the first link on Google, I checked out the blog post on somethingsimilar.com. @jmhodges’s solution is simple; use a fixed-size hash map and then simply overwrite entries on collision. Let’s go through that slowly.

Here’s our hash map, with 10 buckets:

Our empty hash buckets

Now we process our first message and add it:

Our content hashes to bucket 3

To test if some new message has been seen we need to check whether we have got exactly this message content within the appropriate bucket. If the content does match, then we can be sure we’ve seen it. If the content does not match, then we cannot know. The reason is that we may have just overwritten this message with a new message that collided into the same bucket.

So now we write in our next message, and it hashes to the same bucket. At this point we’ve lost our knowledge of having ever seen the first message we processed.

Our next item also hashes to bucket 3; now we have lost knowledge of having seen the previous item

Deciding how big to make it

So with this data structure, we will lose knowledge of messages we have seen; however we can determine how quickly this happens by choosing the size of our hash map (how many buckets we have).

Intuitively, there is a trade off between the amount of space used and our ability to detect duplications. At one extreme, with 1 bucket, we can only ever de-duplicate if we receive messages in order. At the other extreme, with a huge number of buckets, we can nearly always de-duplicate (we are bounded by our hash function’s ability to determine unique values for different content).

To get a clearer picture, we can consider our implementation in terms of probability. Starting with a single message stored, the probability of overwriting this message with the next message (assuming a perfectly random hash function), is 1/N, where N is the number of buckets.

First iteration; chance of removing knowledge of some previously processed message

On our next go, the chances of us overwriting on this go is:

Probability of overwriting on _exactly the second go_

This combines the probability of us _not_ having overwritten on the first go with the probability of overwriting this time. To get the probability of us having overwritten by this go, we simply add up:

Probability of having overwritten _by the second go_

Our next go looks like this:

Probability of overwriting by the third go!

And we can express this as a sum, for any given x (where x is the number of additional messages we’ve written into our hash map):

Probability of having lost some initial message after X goes, with N buckets.

Plotting this, for N=100, we get:

Probability of having overwritten a previously stored message after X further messages processed (x axis) for N=100

So what we are saying here is that with 100 buckets, after adding 459 additional messages, we are 99% certain to have overwritten our initial message and hence 99% certain that we won’t be able to de-duplicate this message if it turned up again.

We can work out the equation of this graph:

Solved

We can visualise this as it varies with both N and X:

Surface plot of equation as it varies in X and N

So if we want to be able to de-duplicate (to 90% chance) a stream running at 1,000 events per second, with an hour delay (y = 0.9, x = 1000*60*60):

0.9 = 1 - (1-1/N) ^ 3600000
0.1 = (1-1/N) ^ 3600000
0.999999360393234 = 1-1/N
1 / N = 0.000000639606766

So N = 1,563,461

NSQPHP implementation

The @jmhodges implementation of opposite of a Bloom filter has an atomic “check and set” to test membership. nsqphp ships with two implementations which implement the same basic interface. The first implementation runs in a single process (and hence doesn’t have to worry about this anyway – due to PHP’s lack of threads).

In this implementation I’m actually using an MD5 of the entire content, to save space. This introduces a theoretical possibility that I could give a false negative (saying it’s seen a message when it hasn’t).

The second implementation uses Memcached to store the actual hash map; this completely ignores races on the basis that they will only mean we may not quite de-duplicate as many messages as we could have.

The only other complication is with failed messages; here we need to erase our knowledge of having “seen” a message. To achieve this we simply update our hash map so that the message we’re interested in is no longer the content within the hash bucket:

if (entry at hash index foo is our content) {
    overwrite with some placeholder (eg: "deleted")
}

Tags: , , , ,

Leave a Reply