2

Say I have a reader and a producer that share a buffer. I have a wait queue for the reader to put itself in if there is no contents in the buffer when it reads. I also have an atomic variable for keeping track of the number of readers.

The producer wakes readers when contents are added to the buffer.

Is it cleaner to decrement the reader counter in the producer, after it does the waking, or the reader, after it wakes?

Example code

void producer(){
    add_to_buffer();
    if(atomic_read(num_waiting_readers) > 0){
        wake_event(consumer_queue);
        // Cleaner here?
        atomic_decrement(num_waiting_readers);
    }
}

void consumer(){
    if(buffer_isempty()){
        atomic_increment(num_waiting_readers);
        wait_event(consumer_queue, 0);
        // Or here, after wake?
        // atomic_decrement(num_waiting_readers);
    }
    read_buffer();
}

Edit: In my specific case, there is only one reader, but the producer might be called multiple times. The libraries are Linux kernel 4.9 (c language), although in my actual code I use the interruptible versions of wait_event and wake_event

  • My initial thoughts are that it is cleaner in the consumer, since that is where the increment is, and because there might be a delay between the wake call and the consumer being woken. But I'm a relatively inexperienced developer, so I would appreciate some other opinions. – silico-biomancer Sep 17 '20 at 22:27
  • 1
    Answers should focus on correctness first. Correctness in low-level concurrency is hard, in particular avoiding race conditions when using low-level primitives when one doesn't have the luxury of reliable concurrency libraries for that particular platform. Clean code is good in general, but clean code principles don't teach anything about how to write correct code in low-level concurrency. To express this in another way: First select the subset of answers that are correct. Then, from this subset of correct answers, select the cleanest code. – rwong Sep 18 '20 at 01:34
  • I will keep that in mind. In my head I'm thinking "clean" is synonymous with less likely to be wrong, since Martin's *Clean Code* discusses cleanliness in parallel programming is an important aspect of correctness. So I agree, correctness > pure cleanliness – silico-biomancer Sep 18 '20 at 01:41

3 Answers3

4

I'm not a parallelism expert, but here's what concerns me.  If there are multiple producers, then there is a race condition by the > 0 test and the wake/decrement.

Let's say there is only one consumer actually waiting on the queue, so the count is 1.  When the race condition happens, multiple producers will add to the buffer and both detect that there is at least 1 waiting consumer, both then waking & decrementing — taking the counter to -1.

Moving the decrement to before the wake shrinks window of the race condition but doesn't eliminate it.

I submit that the best approach is doing an operation that atomically both decrements if the value is > 0 and also let's you know that the decrement happened (or didn't b/c was 0 to start).

Erik Eidt
  • 33,282
  • 5
  • 57
  • 91
  • 1
    I agree with your concern. To check and increment in a single operation, you could probably use [CompareExchange](https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.compareexchange?view=netcore-3.1). – John Wu Sep 17 '20 at 23:48
  • I follow you about preferring it in the consumer, that is a good catch. I don't understand the reason for a decrement-with-check if you are in the consumer though. – silico-biomancer Sep 18 '20 at 01:44
  • 2
    @BlueDrink9 There may be more than one consumer. A reads counter (value = 2).Then B reads counter (value = 2). both de-increment value, and store it. Result counter == 1. When it should == 0. – Kain0_0 Sep 18 '20 at 03:16
3

I don't know what language/library you're using here, but for most, you're working at least two levels of abstraction too low. For keeping a count and blocking when the count is too low, you would typically use a semaphore. Additionally, there is a common even higher-level abstraction called a channel or concurrent queue where this is all handled for you. If it's not in the standard library, you should be able to find a third-party library.

As Erik's excellent answer points out, this sort of code is difficult to get correct. Another issue your code doesn't address is back pressure, which keeps producers from producing faster than consumers can consume. There are probably other unhandled boundary conditions I can't pinpoint. In production code, you should use the above higher-level abstractions whenever possible, because they've already thought it through and baked the solutions into the library.

Back to your original question, if you are wanting to work at this lower level as a learning exercise, or if there are other factors limiting the abstractions you can use, correctly addressing those tricky boundary cases will typically result in one thread doing the increment and the other thread doing the decrement.

Karl Bielefeldt
  • 146,727
  • 38
  • 279
  • 479
  • I have added details about the library (Linux kernel c). I don't quite understand your point about the increment and decrement being in different threads, however. If the process increments to signal that it is waiting, why would it not be responsible for decrementing as well? – silico-biomancer Sep 18 '20 at 01:47
  • 1
    For example, the producer might need to wait for a consumer to process the data before continuing. That requires part of the signaling be done on the consumer side. For another example, a consumer might need to time out or cancel itself. That requires the consumer to be responsible for the decrement. When using a semaphore, one side does the acquire and the other does the release. There might be simple situations where you can do the decrement either place, but I said "typically" because in typical real world situations, that's how it happens to shake out. – Karl Bielefeldt Sep 18 '20 at 01:55
2

Is it cleaner to decrement the reader counter in the producer, after it does the waking, or the reader, after it wakes?

The cleanest code is code where you can read it and tell confidently if it is correct or not.
For multi-threaded code, this means that after every statement where there is a possible interaction with another thread, you analyze what the effects are if your current thread got interrupted by that other thread at exactly that point of execution.

I will give an example of such an analysis on the consumer thread in your code. Each liner of code is prefixed with the thread it executes on and in comments are some observations from the analysis.

/*C*/ void consumer(){
/*C*/     if(buffer_isempty()){
/*C*/         atomic_increment(num_waiting_readers);
/*P*/ void producer(){
/*P*/     add_to_buffer();
/*P*/     if(atomic_read(num_waiting_readers) > 0){
/*P*/         wake_event(consumer_queue);            // Event fizzles, as there are no threads waiting on it!!
/*P*/         atomic_decrement(num_waiting_readers); // This prevents the next producer to re-trigger the event
/*P*/     }
/*P*/ }
/*C*/        wait_event(consumer_queue, 0);          // Consumer will wait forever to be woken
/*C*/     }
/*C*/     read_buffer();
/*C*/ }

The fix to this race condition is to do both the increment and the decrement in the consumer thread and accept that there might be multiple wake_event events before the consumer actually wakes up.

Bart van Ingen Schenau
  • 71,712
  • 20
  • 110
  • 179
  • The example makes this answer very clear and easy to understand, and answers the specific situation. I will accept this answer for now, thank you. The other answers are good for more general situations. – silico-biomancer Sep 19 '20 at 03:29