11

I am building an application that aims to process ~10M data items per second.

Each item is exactly 42 bytes small (including a sorting key) which means the total data rate will not be big 420 MB/s.

The data structure entries are supposed to be sorted (either descending or ascending). Duplicate keys will be possible (multiset).

Depending on the input data, items are either inserted, deleted or modified (which I could basically do using delete & insert, but a direct modification would be desirable). Modifications do not alter the primary key; the item's position within the sorted set will not change.

In order to achieve ~10M input entries I will have to use concurrent operations.

I am not sure about the distribution of search/insert/delete, but let's assume they are all equally distributed.

Most importantly: The complete data structure will always fit into the memory. It won't usually exceed 4-6 GB.


I considered using a lock-free B+tree which could handle the desired data rate and it would be very good in terms of searching data.

It is extremely important that there isn't any data loss on unexpected shutdowns, etc. I could not find anything about making the B+tree fail-safe.

I have also read a lot about the log structured merge-trees (LSM), which look very promising regarding fail-safety. Its advantage is its writing performance, but searching would be bit slower. Here I could not find anything on how to make a LSM work in a concurrent environment.

After reading through a lot of papers I actually prefer LSMs, as they give faster write access and fail-safety. The only downside is that I can't use multithreading, can I?


I would of course be very pleased if someone could recommend other data structures that could possibly fulfill my needs.

Important:

I can't use a standard database because I have to apply arithmetics and additional calculations.

EDIT:

I am sorry that I wasn't precise enough. The following depicts the data flow:

NATS ---> Application ---> event-based messages

I am not using Kafka for several reasons, I use NATS as a streaming pipeline instead, which in my experience performs way better and its features are sufficient for my needs.

The event based messages will be sent to a real database.

I said I want to process ~10 million messages per second. It's not about just parsing the 42-byte small messages, it's about comparing some of them against the big (sorted) list in memory, but only the first <20 entries. Furthermore one single input message may modify 1..* database entries, which depends on the contents of the input message. I think you would call that balanced workload.

10 MHz won't be enough to perform ~10 MIOPS. It's about

  1. processing the data
  2. modifying the data structure (removes, insertions)
  3. put the message into a large sorted multiset (insertion)

while maintaining "transactional" states.

If you look into papers that compare B+tree, LSM, etc. they are using powerful multicore systems and only reach ~1 MIOPS under balanced workload. That is far away from my desired ~10 MIOPS.


Amazing Papers

I am sorry that I wasn't precise enough in my first StackExchange question, thanks for all your answers, I really appreciate it.

In the following I am going to tell you about data structures I encountered in the past hours. Perhaps some of you find it very interesting.

Bw-Tree by Microsoft Research: a b-tree that was optimized to be used with modern hardware. Sounded very promising, but I could only reach ~3M inserts/deletes, but insane ~40M reads on my machine using 32 cores using a 30 million item-sized tree.

This patent by Google Inc. looked also very promising, although there are not very much measurements on it.

And the one that looks the best is: Bz-tree. It is very recent work by some very smart people. In the following days I am going to implement that data structure (there is no public implementation) and see what it does. I like the fact that my server will use NVMe SSDs, which is supported by the Bz-tree.

Still, I would like to hear other alternatives, if you happen to know something new.


An example about what I am trying to achieve

We have measurements of mp/h and additional metadata (daytime for example). I have to start searching from the lowest and do some calculations and alter some data for each and every item until I reach a specific value in the list.

  1. 33 mp/h | additional metadata
  2. 34 mp/h | additional metadata
  3. 38 mp/h | additional metadata
  4. 51 mp/h | additional metadata
  5. 53 mp/h | additional metadata ....

Now an input message contains a velocity of 50 mp/h. I need to iterate from the first item (1) to (3) and do some calculations on the metadata and alter all of them. After that the data (50 mp/h) is added to the list, which is why a simple vector would not be enough.

(This is not what I am doing but you should get a feel why it must be sorted)

SmartArray
  • 227
  • 1
  • 6
  • 3
    "In order to achieve ~10M input entries I will have to use concurrent operations." Why? – candied_orange Nov 14 '18 at 02:33
  • 1
    How many operation sources are there? What are the serialisation requirements? Are some of these operations transactional? Are dirty reads, or writes permitted? What is the tolerance on data lose? What is the permissible recovery window? Is sorting strictly required at the data-structure level, or is merely the appearance of sortedness to an observer required? Are keys a piece of domain knowledge or can the data structure generate them? Are keys apart of the 48 bytes? How long are the keys? – Kain0_0 Nov 14 '18 at 04:15
  • 2
    @candied_orange: One single core would not be fast enough. I already tried a simple insert (withiut arithmetics) with one core and the best I could achieve was 1M. Sure I could optimize it even more, but I won‘t be able to handle 10 million data items per second – SmartArray Nov 14 '18 at 04:21
  • 1
    @Kain0_0: No serialization, it‘s already serialized within my C programm. The key is 4 byte (uint64_t). Actually there is no tolerance in data loss. Although I did not mention that I have an input stream (using NATS), which is able to buffer the input data and also offers replaying of input data. On exit, I can start ingesting data from a certain offset. I could for example do batch inserts (100k items) and then save the stream offset. If after that, the application exits, I can replay the data. Yes, sorting is required at this application, because I have to do arithmetics as fast as possible – SmartArray Nov 14 '18 at 04:32
  • So the stream is serialising all interactions with this data-structure, including its responses? ie: >> lookup << result >> insert << added>> delete << done >> lookup << not found. I'm using the word in the sense of a database with concurrent transactions. – Kain0_0 Nov 14 '18 at 04:59
  • Nope sorry I still don't understand your determination to go with concurrency. ~10M items per second is ~10 MHz. My 8088 could handle that on one CPU back in the 80s. If all you want to do is sort all you have to move around are references so 42 bytes isn't a size issue. The question then becomes what weird kinda comparisons you're doing that take thousands of CPU clock cycles each time. – candied_orange Nov 14 '18 at 09:20
  • What is it about a database that precludes doing math on the data? – Blrfl Nov 14 '18 at 12:39
  • 3
    I may have missed something but what I think is missing here is what the retrieval requirements are. It's easy to make the inserts and updates fast if retrieval doesn't have any time constraints. You just keep adding each new thing to the end of your data. You also haven't clarified why you think that it will never be more than 4-6 GB. 6GB is a little less than 15 seconds of data. If I've done my math right, that means you will not have much more than 150 million different keys. Correct? – JimmyJames Nov 14 '18 at 21:02
  • 2
    So again to clarify, do the reads need to see the write made just before it? Do the writes need to see the write made just before it? These have large consequences on the data-structure, and algorithms used with it. Also you have not clarified the sorting requirement, you indicate that sorting is important to the maintenance of the data-structure (which is not necessarily so), so what is the O(?) requirement for data-access from the perspective of your algorithm. It appears from your suggested trees that O(log(n)) is okay, but a hash table can provide O(1) or even O(k). – Kain0_0 Nov 15 '18 at 05:57
  • @candied_orange "My 8088 could handle that on one CPU back in the 80s" - No way. This would assume you could perform the sort or other computation on an element in a single clock cycle. I think you're off by a factor of 50-100 here. – JimmyB Nov 15 '18 at 14:23
  • "which means the total data rate will not be big 420 MB/s" - That's not a meaningful number here. Whatever data structure you use to organize the data into, it will almost certainly result in a big relative overhead in I/O (because your data items are so small) of maybe 100-200%; then add to that that your data structure will likely require random reads/writes at different locations of some page size number of bytes and your required 'bandwidth' will multiply. If you really need transactionality, or 'fail-safe', every write multiplies again and gets throttled by synchronization to disk. – JimmyB Nov 15 '18 at 14:31
  • @JimmyB That is generally true, but look into the last paper. It‘s really amazing what it promises!! – SmartArray Nov 15 '18 at 15:31
  • @Blrfl sorry, I can‘t tell you much about it. Please read the following example – SmartArray Nov 15 '18 at 15:31
  • 1
    @Kain0_0 yes, data must be transactional. It‘s not too critical that a previous write is overseen, but it should not happen. Sorting is important, imagine the following: We have measurements of mp/h and additional metadata (daytime for example). I have to start searching from the lowest and do some calculations and alter some data (This is not what I am doing but you should get a feel why it must be sorted) – SmartArray Nov 15 '18 at 15:32
  • 1
    It's still not clear why it's required to update in place. It will greatly complicate any sort of concurrency and likely slow things down a lot. You also lose the history which would seem to be interesting. Seems like a lose-lose to me. The only advantage is space savings. With 64 architecture, you can use a lot memory and it's cheap these days. – JimmyJames Nov 15 '18 at 15:57
  • 1
    @JimmyJames Thank you for that. I did not know that the English term "velocity" is describing a vector :-) You're right, the history would be indeed very interesting, but I decided to do it because it fits better into memory and it would become too big over the years. I want it to be loaded very quickly when the applications starts – SmartArray Nov 15 '18 at 16:05
  • 1
    @SmartArray The simplest option I could see would be to have actors that work from oldest to newest to clean things up. This is pretty standard stuff e.g. Kafka. The advantage is that your inserts and your cleanup are looking at completely different elements so there's no contention. To get the current state you work from newest to oldest events to find relevant data. If you want to speed up the retrieval of the current state, you could maintain an index. If you can provide a little more detail on retrieval requirements, it would give a fuller picture of the problem space. – JimmyJames Nov 15 '18 at 16:17
  • 1
    Kafka seemed to be too slow... is it capable to process 10M messages per second in realtime? I think of the following: Every time an element changes it will emit events to another system. These will come directly into a DB. There is no real retrieval. To be honest, I was searching for a data structure, because I have a lot of interest in them – SmartArray Nov 15 '18 at 16:29
  • @JimmyB that factor you think I'm off by is exactly what I'm asking the OP to add to the question. Telling me the item is 42 bytes long isn't enough to answer that. – candied_orange Nov 15 '18 at 16:44
  • @candied_orange **42** bytes is *not* [the answer](https://en.wikipedia.org/wiki/42_(number)#The_Hitchhiker%27s_Guide_to_the_Galaxy)?! ;-) – JimmyB Nov 15 '18 at 16:48
  • What isn‘t answered yet? I don‘t get it „Transactional“ does not answer your question? – SmartArray Nov 15 '18 at 17:36
  • 1
    If the data structure never grows beyond 6GB, that's a mere 15s at this rate. So you only have to sustain the 10M/s rate for a few seconds, not continuously? If so, just stream the raw entries to disk and afterwards sort out any metadata. Also rethink whether you really have to avoid any data loss within that timeframe. If you do need to maintain this rate continuously, where does the 6GB limit come from? – amon Dec 09 '18 at 14:36
  • 1
    6GB limit was an approximate. Somewhere I mentioned that I also delete items (or alter them). But the max memory usage would be 6GB according to my calculations. Actually yes, the data is extremely critical and must be persisted on disk, no item must get ever lost... I know that all sounds weird, but the question was already solved myself; I am going to use BzTree (which is a bit difficult to implement, but I think I am going to succeed!). – SmartArray Dec 09 '18 at 14:40
  • I'm wondering if you have tunnel vision on the problem that you need to solve. There is not enough here to think about the wider problem and bounce some thought through ideas back at you, but this looks like the kind of problem that having a singleton in your infrastructure is a poor choice. you lose the instance, you then have to both recovery and then catchup.... – Michael Shaw Oct 11 '21 at 03:54
  • is there a natural partition key e.g. vehicle id? Or more than one? If so, this may open up additional possibilities not yet discussed. Hard to know from the details provided so far. – Jason Weber Sep 13 '22 at 04:38
  • >The key is 4 byte (uint64_t)!? 4*8=32 – gapsf Oct 09 '22 at 05:52
  • (4 gigahertz) / (10 megahertz) = 400 So all you want (ALL code that fully process one item) must be done in 400 cpu cycles per item on 4Ghz core – gapsf Oct 09 '22 at 05:58
  • Smartarray is "blockchain expert" - so yammy https://softwareengineering.stackexchange.com/users/320132/smartarray – gapsf Oct 09 '22 at 07:26
  • @gapsf I don't know what you meant to say with your last comment but in case you were trying to say anything negative, please consult this link: https://softwareengineering.stackexchange.com/conduct – SmartArray Oct 11 '22 at 14:23
  • @gapsf Thank you for telling my something so clear. I thought on byte is 16 bit. May bad. Moreover, the heading says "concurrent". – SmartArray Oct 11 '22 at 14:25
  • Did you manage to process 10M items per second? – gapsf Oct 11 '22 at 15:26
  • The performance of the setup achieved more than 10M, depending on the data distribution. The best case was 16M, the worst was 500k using NVMe SSDs – SmartArray Oct 11 '22 at 16:28

1 Answers1

1

I think that you should look at using the TPL (Task Parallel Library by Microsoft). If I am correctly understanding the scenario outlined in your question, then this would provide you with the low level concurrency primitives which will be needed to implement your solution.

I would suggest that you start by taking a look at Processing Pipelines Series - TPL which outlines a scenario strikingly similar to your own!