I’ve recently started playing around with NSQ, in my hunt for a good, resilient, highly available, guaranteed “at least once” delivery queue. That’s a lot of adjectives, but basically it boils down to a queue that puts a copy of messages on N nodes and is able to operate (without losing messages) with any X of them failing, obviously where X < N.

NSQ attacks this problem in an interesting way. Instead of trying to form a cluster (in the sense that say RabbitMQ does), it instead treats each **`nsqd`** instance as a separate entity. It is only the clients that know there is more than one of them, and the directory service **`nsqlookupd`**. This actually makes it very reliable, in the sense that there are no troublesome master/slave relationships to preserve or leaders to elect.

This simplicity forces some of the work back on the client.

- NSQ is guaranteed “at least once”, rather than “exactly once”; hence subscribers should operate in an idempotent way
- when using with replication, it is up to the client to de-duplicate the messages on subscription

### Deduplication

To de-duplicate, a subscriber needs to determine if it has seen a message before. Doing so in an accurate way would involve storing all the message IDs or some digest of the message itself in a large hash table. With this we could simply test:

if (message is in hash map) { ignore } process

Then we just need to make sure we add messages seen to the hash map. With a lossless hash map (eg: store everything), this is going to use unbounded memory.

### The opposite of a Bloom filter

Bloom filters were my first thought when trying to come up with a way of bounding memory. Bloom filters are a probabilistic data structure that is able to test if some element is a member of a set. A Bloom filter will never tell you an item is in the set if it isn’t (no false negatives), but may tell you it is in the set when really it isn’t (chance of false positives).

What I actually want is _the opposite_ of a Bloom filter.

http://lmgtfy.com/?q=opposite+of+a+bloom+filter

So picking the first link on Google, I checked out the blog post on somethingsimilar.com. @jmhodges’s solution is simple; use a fixed-size hash map and then simply overwrite entries on collision. Let’s go through that slowly.

Here’s our hash map, with 10 buckets:

Now we process our first message and add it:

To test if some new message has been seen we need to check whether we have got **exactly this message content** within the appropriate bucket. If the content does match, then we can be sure we’ve seen it. If the content does not match, then we cannot know. The reason is that we may have *just* overwritten this message with a new message that collided into the same bucket.

So now we write in our next message, and it hashes to the same bucket. At this point we’ve lost our knowledge of having ever seen the first message we processed.

### Deciding how big to make it

So with this data structure, we will lose knowledge of messages we have seen; however we can determine how quickly this happens by choosing the size of our hash map (how many buckets we have).

Intuitively, there is a trade off between the amount of space used and our ability to detect duplications. At one extreme, with 1 bucket, we can only ever de-duplicate if we receive messages in order. At the other extreme, with a huge number of buckets, we can *nearly always* de-duplicate (we are bounded by our hash function’s ability to determine unique values for different content).

To get a clearer picture, we can consider our implementation in terms of probability. Starting with a single message stored, the probability of overwriting this message with the next message (assuming a perfectly random hash function), is 1/N, where N is the number of buckets.

On our next go, the chances of us overwriting *on this go* is:

This combines the probability of us _not_ having overwritten on the first go with the probability of overwriting this time. To get the probability of us having overwritten *by this go*, we simply add up:

Our next go looks like this:

And we can express this as a sum, for any given x (where x is the number of additional messages we’ve written into our hash map):

Plotting this, for N=100, we get:

So what we are saying here is that with 100 buckets, after adding 459 additional messages, we are 99% certain to have overwritten our initial message and hence 99% certain that we won’t be able to de-duplicate this message if it turned up again.

We can work out the equation of this graph:

We can visualise this as it varies with both N and X:

So if we want to be able to de-duplicate (to 90% chance) a stream running at 1,000 events per second, with an hour delay (y = 0.9, x = 1000*60*60):

0.9 = 1 - (1-1/N) ^ 3600000 0.1 = (1-1/N) ^ 3600000 0.999999360393234 = 1-1/N 1 / N = 0.000000639606766

So N = **1,563,461**

### NSQPHP implementation

The @jmhodges implementation of opposite of a Bloom filter has an atomic “check and set” to test membership. **nsqphp** ships with two implementations which implement the same basic interface. The first implementation runs in a single process (and hence doesn’t have to worry about this anyway – due to PHP’s lack of threads).

In this implementation I’m actually using an MD5 of the entire content, to save space. This introduces a theoretical possibility that I could give a false negative (saying it’s seen a message when it hasn’t).

The second implementation uses Memcached to store the actual hash map; this completely ignores races on the basis that they will only mean we may not quite de-duplicate as many messages as we could have.

The only other complication is with failed messages; here we need to *erase* our knowledge of having “seen” a message. To achieve this we simply update our hash map so that the message we’re interested in is no longer the content within the hash bucket:

if (entry at hash index foo is our content) { overwrite with some placeholder (eg: "deleted") }

Tags: bloomfilter, nsq, oppositeofabloomfilter, PHP, stream