2

I have a stream of documents, each having an ID which is a 64 bit unsigned long given as a (maximum length is 20) string.

Sometimes an ID appears multiple times in the stream. I don't want to process a document if I already processed it, so I'm using Redis to cache last 20M document ID's. If 20M is exceeded, cache can be reset. I can afford to reprocess a document time to time.

I was trying to come up with a local solution that would save network trips to Redis.

First I tried to create 5 BitSets, each for a 4 digit part of IDs (5x4=20). I'd split an incoming id to 5 4-digit parts and check all 5 BitSets, and if every one of them contains it, than it means that ID is processed before. It was very fast and required little memory.

Unfortunately it was not that simple, my algorithm is flawed. Because I'm checking if there's an ID starting with these 4 digits, and then the next 4 digits etc. It gives false positives when all 5 groups exist independently but they don't exist together as a distinct ID.

Then I realized it's not possible to represent all possible ID's with this kind of setup.

I needed a growing data structure, something like a Trie. Fortunately Apache Commons Collections has a Patricia Trie implementation.

    PatriciaTrie<Boolean> t = new PatriciaTrie<>();
    
    StopWatch sw = new StopWatch();
    sw.start();
    sw.suspend();

    for(int i = 0; i < 20_000_000; i++) {
        String s = RandomStringUtils.random(ThreadLocalRandom.current().nextInt(20) + 1, false, true);
        sw.resume();
        t.containsKey(s);
        t.put(s, true);
        t.containsKey(s);
        sw.suspend();
    }
    
    sw.stop();

    System.out.println("took: " + sw.getTime());

Inserting and looking up 20M ID's takes 45 seconds and uses more than 2GB of RAM on my computer. I don't know if 45 seconds is much better than just using Redis, I need to test it, but 2+GB ram usage is a bit high. Maybe my random generated strings are contributing to that, even though they are used just once?

The next improvement I see is, PatriciaTrie is able to hold data in every node. I don't need that capability, I only need if a key exists or not.

Is this an empty endeavour that I should quit and just stick with Redis, or can there be an efficient algorithm? Since all ID's are numeric strings with max 20 digits, can there be a specific efficient solution to this?

uylmz
  • 1,129
  • 1
  • 8
  • 17
  • You're in Java, right? Your managed language is killing you in this case because whatever you do - PatriciaTrie or not - you've got heap overhead. And, too, because everything is in the heap you're not getting very good locality-of-reference so your searches in that tree aren't going all over the place and you're probably thrashing the TLB (i.e., these kind of searches against such a large dataset will never be _cache_ efficient, but in this case they're not even _page table_ efficient). Is it possible to pick another language _just for this data structure + search_? – davidbak Feb 24 '21 at 01:18
  • Actually this project is a large Apache Storm topology and my question is a bolt that does what I described. So I can write something in another language but I'd have to call it over the network, so the local advantage is lost, only possible improvement is it could be more efficient than Redis. – uylmz Feb 24 '21 at 14:58
  • Is there a language that comes to your mind? – uylmz Feb 24 '21 at 15:08
  • Oh, well. The language that always comes to _my_ mind is C++. It's unmanaged, you know. – davidbak Feb 24 '21 at 15:54

3 Answers3

3

Your bitset idea has promise and reminded me of Bloom filters which have gained popularity of late. I think the central problem - as you noted - is this: "How to handle possible duplicates/collisions?"

Fortunately this is a problem that has been studied heavily with respect to hash table implementation - see the "Collision Resolution" section of the Wikipedia article on hash tables. Much of this knowledge has been baked into existing libraries of course, but unfortunately that may not always translate perfectly into what you'd like to do.

It appears that you have a couple valuable constraints that may make the problem quite reasonable:

  1. ID's can be represented as UInt64's.
  2. Only 20M items need to be able to fit into the cache.

You've already considered that the key range of #1 is too big for memory - but #2 can be quite helpful. As pointed out elsewhere, the total number of bytes to store 20M UInt64's is quite reasonable at 160MB - but of course other data structure overhead can be quite heavy. It's still a useful reference point.

So first, are the built-in data structures that bad? I tried .NET's builtin HashSet<UInt64> and - depending on your key distribution - ended up using about 720MB of memory and it only took 6.5 seconds:

Estimated hash memory usage: 719833240
Total addition time: 00:00:06.4888422
Stats:
Added: 18851584
Found: 1148416

Using UInt64 directly probably saved quite a bit. (Although I'm afraid you'll have to see what Java can do.)

But can we do better by leveraging constraint #2 while still being relatively lazy/leveraging library built-ins? Quite possibly, yes. One simple approach is to create the world's simplest hash table ourselves with a fixed bucket count and essentially open addressing, then fallback to a builtin if the bucket gets full. We can easily turn up the number of slots per bucket as a space vs. time tradeoff and we still don't have to implement any of the hard work around collision management because we punt on the problem.

With this type of approach, I saw results like this on my box - code will be shared below. Note the slightly reduced memory usage and decently reduced time.

Estimated hash memory usage: 562606716 (total 723054508)
Total addition time: 00:00:02.6151904
Stats:
AddedInSlot: 18426506
FoundInPrimarySlot: 988489
FoundInLaterSlot: 146550
AddedInOverflow: 425078
FoundInOverflow: 13377

Code (caveat emptor - may be bugs! I also used explicit types for ultimate clarity):

using System;
using System.Collections.Generic;
using System.Collections;
using System.Diagnostics;

namespace BigHashChecker
{

    public enum AddResult
    {
        AddedInSlot,
        FoundInPrimarySlot,
        FoundInLaterSlot,
        FoundInOverflow,
        AddedInOverflow
    }

    public class BigHash
    {
        public const Int32 BUCKET_SIZE = 2;
        public const Int32 BUCKET_COUNT = 0x2000000; //Round 20M up a bit
        public const UInt32 BUCKET_MASK = BUCKET_COUNT - 1;
        public const UInt64 RESERVED_VALUE = 0; //Exclude ID 0 for simplicity

        public UInt64[] Buckets = new UInt64[BUCKET_SIZE * BUCKET_COUNT];
        public HashSet<UInt64> Overflow = new HashSet<UInt64>();

        public AddResult Add(UInt64 value)
        {
            //Find which bucket it should go in.
            //Note this greatly depends on the distribution of your keys
            //However, we will stick with the most naive method possible
            //for speed and simplicity right now.
            UInt32 whichBucket = (UInt32)(value & BUCKET_MASK);
            UInt32 bucketStartIndex = whichBucket * BUCKET_SIZE;
            for (UInt32 i = 0; i < BUCKET_SIZE; i++)
            {
                UInt64 slotValue = Buckets[bucketStartIndex + i];
                if (slotValue == RESERVED_VALUE)
                {
                    Buckets[bucketStartIndex + i] = value;
                    return AddResult.AddedInSlot;
                }
                else if (slotValue == value)
                {
                    return i == 0 ? AddResult.FoundInPrimarySlot : AddResult.FoundInLaterSlot;
                }
            }

            //Could not fit it into buckets, check/add to overflow.
            if (Overflow.Contains(value))
            {
                return AddResult.FoundInOverflow;
            }

            Overflow.Add(value);
            return AddResult.AddedInOverflow;
        }
    }

    class Program
    {
        private const int SIMULATED_DATA_COUNT = 20*1000*1000;
        static void Main(string[] args)
        {

            //UInt64[] simulatedData = GenerateUniformData(); //Uniform UIn64 - very sparse, virtually no overlap
            //UInt64[] simulatedData = GenerateDataOverRange(1,0x2000000); //Match the bucket count - minimal overlap
            //UInt64[] simulatedData = GenerateDataOverRange(1, (int)(0.95*SIMULATED_DATA_COUNT)); //Force ~5% overlap
            UInt64[] simulatedData = GenerateDataOverRange(1, 5*0x2000000); //Range of 5x the bucket count - mostly sparse but a few second/overflow slots
            Int64 sampleDataMemoryBaseLine = GC.GetTotalMemory(false);

            BigHash bigHash = new BigHash();
            Console.WriteLine("Running...");
            int[] addResults = new int[5];
            Stopwatch stopwatch = Stopwatch.StartNew();
            for (int i = 0; i < simulatedData.Length; i++)
            {
                AddResult result = bigHash.Add(simulatedData[i]);
                addResults[(int)result]++;
            }
            stopwatch.Stop();

            Int64 withHashMemoryBaseline = GC.GetTotalMemory(false);
            Int64 estimateMemoryBytes = withHashMemoryBaseline - sampleDataMemoryBaseLine;

            Console.WriteLine("Estimated hash memory usage: " + estimateMemoryBytes + " (total "+withHashMemoryBaseline+")");
            Console.WriteLine("Total addition time: " + stopwatch.Elapsed);
            Console.WriteLine("Stats:");
            Console.WriteLine(AddResult.AddedInSlot + ": " + addResults[(int)AddResult.AddedInSlot]);
            Console.WriteLine(AddResult.FoundInPrimarySlot + ": " + addResults[(int)AddResult.FoundInPrimarySlot]);
            Console.WriteLine(AddResult.FoundInLaterSlot + ": " + addResults[(int)AddResult.FoundInLaterSlot]);
            Console.WriteLine(AddResult.AddedInOverflow + ": " + addResults[(int)AddResult.AddedInOverflow]);
            Console.WriteLine(AddResult.FoundInOverflow + ": " + addResults[(int)AddResult.FoundInOverflow]);
            Console.ReadKey();
        }

        static UInt64[] GenerateUniformData()
        {
            UInt64[] simulatedData = new UInt64[SIMULATED_DATA_COUNT];
            Random random = new Random(Seed: 12345);
            byte[] uint64RandomBuffer = new byte[8];
            for (int i = 0; i < SIMULATED_DATA_COUNT; i++)
            {
                random.NextBytes(uint64RandomBuffer);
                simulatedData[i] = BitConverter.ToUInt64(uint64RandomBuffer, 0);
            }
            return simulatedData;
        }

        static UInt64[] GenerateDataOverRange(int minValue, int maxValue)
        {
            UInt64[] simulatedData = new UInt64[SIMULATED_DATA_COUNT];
            Random random = new Random(Seed: 123456);
            for (int i = 0; i < SIMULATED_DATA_COUNT; i++)
            {
                simulatedData[i] = (UInt64)random.Next(1, maxValue);
            }
            return simulatedData;
        }
    }
}

Sounds like you have a fun problem! I hope everything works out well in your implementation.

J Trana
  • 1,369
  • 9
  • 17
1

According to my calculation you would need roughly 560mb memory to put everything in memory in one go. 20 000 000 * 20bytes + 8bytes(64 bit for each pointer in the list) = aproximatly 560mb

If you double the memory to give som execution space. A program allocating 1.1G will have no problem processing in one go the complete set including filtering out the duplicates and processing only the unique elements.

You can effectivly reduce the memory impact by backing up the stream with some persistent structure. Ehcache f.ex. can have disk persistence. Reading up everything in kafka and so on...

Alexander Petrov
  • 927
  • 5
  • 14
1

If you feel like writing your own data structure, use a variant of a Skip List. A standard skip list keeps a list of items in sorted order, giving you O(log n) search complexity as well as O(log n) insertion complexity. But the trick here - and the reason to write your own (and an ordinary skip list isn't really hard, so start with that) is that you'll build an "n-way" skip list where the lowest level doesn't hold individual elements but instead is a small array of elements, kept in order. Perhaps 4-16 of your 64-bit numbers (as integers). (Sort of like the way a B-tree is a multi-way tree instead of a binary tree.) Reason to do this: You win big with the total amount of space used and with locality of reference since, in Java, arrays - unlike generic data structures - can hold unboxed integers. And all your "leaves" - the lowest level of the skip list - will be these arrays. So none of your 20M integers will be boxed, individually, into the heap.

(I'm just throwing this out there as an idea. Haven't seen this described anywhere ... which just means I'm not up-to-date on the literature. If you see a paper or web page describing (or implementing!) this data structure please add the reference here!)

davidbak
  • 712
  • 1
  • 7
  • 10