|
First I provide some primary points and then code sketch.
There is no limitations on queue size and producer/consumer count. Implementation requires only CAS instruction. Values must be of word size (if DWCAS supported then values can be of dword size). There must be some reserved value that never enqueued (currently 0).
The main point is: for underlying data-structure I use nested-list, i.e. values organized in continuous buffers of some constant size (nests), nests linked in single-linked list. Nest filled with values only once and only once consumed, then nest is recycled. So there is no cyclic buffers like in bounded queue on top of vector.
Schema of dequeue operation: 1. Get current tail nest 2. Read current dequeue position in this nest 3. If current tail nest if fully consumed and there is next nest then recycle the nest and try to move to next nest 4. If current tail nest if fully consumed and there is no next nest then return 0 5. Read value in current dequeue position 6. If value == 0 then return 0 7. Try to increment current dequeue position with CAS 8. If success then return value 9. If fail then goto 2
Important points about enqueue operation: I don't maintain current enqueue position in nest at all. Instead I use enqueue position hint and some sophisticated oracle to determine actual enqueue position. Nest is zeroized after allocation, so I use CAS(&enqueue_position, value, 0) to enqueue value. After successful enqueue I update enqueue position hint with plain store operation.
Algorithm of oracle to determine actual enqueue position (subject for modifications and improvements): 1. Check current enqueue position hint 2. If fail then check several next positions 3. If fail then use binary search in [enqueue_position_hint, end_of_nest] to find right position Note: hint can't point to position after actual enqueue position, only before.
Schema of enqueue operation: 1. Get current head nest 2. Read current enqueue position hint in this nest 3. If nest is fully filled then allocate new nest and link this new node as head 4. Try to write value with CAS(&enqueue_position, value, 0) 5. If success then return 6. If fail then use oracle to predict right enqueue position and goto 3
Implementation notes. 1. For managing nest life time and prevent ABA I use PDR scheme for nests. I cache current nest in TLS, so atomic RMW issued only once for nest. Head and tail pointers is strong PDR pointers, all other pointers to nests is normal PDR pointers. Maybe other scheme can be used here to prevent ABA and premature node reusing. But I use PDR here for simplicity. 2. For allocation/deallocation of nests I use lock-free stack-based freelist. Before nest is placed to freelist, nest is cleared, i.e. buffer zeroized and enqueue/dequeue positions set to 0. If there are no nests in freelist, allocation requests forwarded to system allocator.
Here is code sketch:
unsigned const nest_size = 1000;
struct nest_t { // can be incorrect unsigned enqueue_pos_hint_; // always maintained in correct state unsigned dequeue_pos_; void* buffer_[nest_size]; nest_t* next_;
nest_t() { next_ = 0; enqueue_pos_hint_ = 0; dequeue_pos_ = 0; memset(buffer_, 0, sizeof(buffer_)); } };
struct mpmc_queue_t { global_ptr<nest_t> head_; global_ptr<nest_t> tail_;
mpmc_queue_t() { head_ = tail_ = alloc_node(); }
void enqueue(void* value) { // here strong acquire of head // cached in tls, so atomic rmw issued // only once for nest local_ptr nest (head_); unsigned pos = nest->enqueue_pos_hint_; do { if (nest_is_over(nest.get(), pos)) { // current nest is full // have to allocate new nest and link it nest_t* new_nest = alloc_node(); nest_t* old_nest = CAS(nest->next_, new_nest, (nest_t*)0); if (!old_nest) { head_.CAS(new_nest, nest.get()); } else { head_.CAS(old_nest, nest.get()); free_node(new_nest); } // update current nest nest = head_; pos = nest->enqueue_pos_hint_; } // check whether current pos is empty if (check_enqueue_pos(nest.get(), pos)) { // try to write the value if (!CAS_fail(nest->buffer_[pos], value, 0)) { // plain store to enqueue_pos_hint_ nest->enqueue_pos_hint_ = pos + 1; return; } } // here some sophisticated logic // to determine enqueue pos // involves look-ahead, binary-search etc... pos = enqueue_pos_oracle(nest.get(), pos); } while (true); }
void* dequeue() { // here strong acquire of head // cached in tls, so atomic rmw issued // only once for nest local_ptr nest (tail_); char* value; unsigned pos; do { pos = nest->dequeue_pos_; if (nest_is_over(nest.get(), pos)) { // current nest is fully consumed // try to switch to next nest nest_t* next = nest->next_; if (!next) return 0; tail_.CAS(next, nest.get()); nest = tail_; } // read current value value = nest->buffer_[pos]; if (!value) return 0; } // try to increment dequeue pos while (CAS_fail(nest->dequeue_pos_, pos + 1, pos)); return value; } };
I think that nested-list based lock-free data structures are very interesting and perspective. I haven't seen it before. Definitely some other lock-free data structures can be build on top of nested-list...
Comments/suggestions/thoughts are welcome ;)
Dmitriy V'jukov
|
|
0
|
|
|
Reply
|
dvyukov
|
5/18/2007 12:38:11 PM
|
Header
|
Report
as Spam
|
|
"Dmitriy Vyukov" <dvyukov@gmail.com> wrote in message news:1179637710.646771.295900@y80g2000hsf.googlegroups.com... > On May 20, 8:02 am, "Chris Thomasson" <cris...@comcast.net> wrote: >> > Not the queuing scheme I invented for the distributed lock-free event >> > multiplexer used in vZOOM. >> > --- I can do unbounded queue with no interlocked operations and no >> > #LoadLoad, #StoreLoad or #LoadStore style memory barriers at all... I >> > have >> > not posted the code for this. --- >> The multiplexer/demultiplexer makes distributes the messages from >> multiple >> producers to multiple consumers. > > Let me guess :) > You dedicate one thread as multiplexer/demultiplexer. And use 2*N spsc > queues (not N*(N-1) ). So every other thread have queue to multiplexer/ > demultiplexer and from multiplexer/demultiplexer. So every thread > doesnt' need to do event multiplexing manually. > ?
Well, any thread can act as a multiplexer or demultiplexer. Any thread can receive messages from any other thread. A thread needs to register with another thread for communication. So, when I create a thread, I use PDR to walk a linked-list of active thread structures, and pushes message descriptors, onto itself and each thread. Now, the newly created thread can multicast a single message to any other thread. Basically, you end up with any thread being able to multicast to any other thread, using nothing but atomic operation and membar free (e.g., except for #StoreStore) single producer-consumer queues.
|
|
0
|
|
|
Reply
|
Chris
|
5/20/2007 9:52:35 PM
|
Header
|
Report
as Spam
|
|
On 21 ���, 01:52, "Chris Thomasson" <cris...@comcast.net> wrote:
> Well, any thread can act as a multiplexer or demultiplexer. Any thread can > receive messages from any other thread. A thread needs to register with > another thread for communication. So, when I create a thread, I use PDR to > walk a linked-list of active thread structures, and pushes message > descriptors, onto itself and each thread. Now, the newly created thread can > multicast a single message to any other thread. Basically, you end up with > any thread being able to multicast to any other thread, using nothing but > atomic operation and membar free (e.g., except for #StoreStore) single > producer-consumer queues.
So you create N^2 queues... Don't you think that this is too costly? For example if I have 4 cores and 4 threads, then I would have 16 queues. So.. this is acceptably. But is common pattern to create threads more then cores. For example if I have 32 threads, I would have 1000 queues and every thread would have to multiplex 31 queue... This is arguably... Maybe on ccNUMA, this is reasonable, but on few-core desktop server imho this is not worth...
Btw your solution is not producer-consumer strictly saying. Because there is no load balancing. And it is not so easy to attach nontrivial (not round-robin) load balancing, anyway this increase overheads further...
So I think you solution is just to another problem. It is more solution to messaging. My solution is more to producer- consumer.
And maybe even for messaging I would prefer solution based on mpsc queues on top of lifo stack with reverse consuming. It has only 1 CAS per enqueue and 0 (amortized) CAS on dequeue. After all CAS is only CAS, it is not call to remote web-service over SOAP :) And you can easyly overtake CAS overheads if you have O(N) complexity and you do a lot of other work instead of 1 CAS... This is my imho. But I target more to few-core systems, not to many- core ccNUMA systems, I just didn't work with them...
Dmitriy V'jukov
|
|
0
|
|
|
Reply
|
Dmitriy
|
5/22/2007 11:29:58 AM
|
Header
|
Report
as Spam
|
|
"Dmitriy Vyukov" <dvyukov@gmail.com> wrote in message news:1179833398.263497.199890@z24g2000prd.googlegroups.com... > On 21 ���, 01:52, "Chris Thomasson" <cris...@comcast.net> wrote: > >> Well, any thread can act as a multiplexer or demultiplexer. Any thread >> can >> receive messages from any other thread. A thread needs to register with >> another thread for communication. So, when I create a thread, I use PDR >> to >> walk a linked-list of active thread structures, and pushes message >> descriptors, onto itself and each thread. Now, the newly created thread >> can >> multicast a single message to any other thread. Basically, you end up >> with >> any thread being able to multicast to any other thread, using nothing but >> atomic operation and membar free (e.g., except for #StoreStore) single >> producer-consumer queues. > > So you create N^2 queues... > Don't you think that this is too costly? [...]
It totally depends on how the user sets up their environment. A common setup is to use a single-thread as the multiplexer/demultiplexer. Another possible setup is allowing several threads, or every thread, to act as the multiplexer/demultiplexer. Its usually useful to have several threads multicasting so that your messages don't get bottlenecked, which can happen if you use a single-thread.
I need to clarify this statement:
">>So, when I create a thread, I use PDR to >> walk a linked-list of active thread structures, and pushes message >> descriptors, onto itself and each thread. "
which should read:
"So, when I create a thread which requests to act as a multiplexer/demultiplexer, I use PDR to walk a linked-list of active thread structures, and pushes message descriptors, onto itself and each thread. "
A thread that does not request to act as a multiplexer/demultiplexer does not need to walk the thread list and push any message descriptors. They can just produce messages which can get multicasted to every other thread, however, they will NOT be doing the multicasting themselves...
> Btw your solution is not producer-consumer strictly saying. Because > there is no load balancing. And it is not so easy to attach nontrivial > (not round-robin) load balancing, anyway this increase overheads > further...
There are load-balancing mechniasims if the user chooses to use them.
> So I think you solution is just to another problem. > It is more solution to messaging. My solution is more to producer- > consumer.
Well, I can produce a single message which can get multicasted to every other thread. This can work. The difference is wether the thread can multi-cast the message itself, or not.
> And maybe even for messaging I would prefer solution based on mpsc > queues on top of lifo stack with reverse consuming.
I remember when Joe Seigh mentiond this idea a while back:
[...]
http://groups.google.ca/group/comp.programming.threads/msg/a359bfb41c68b98b
Yup. The idea has been around, and it works... I just wonder about lengthy reversals...
|
|
0
|
|
|
Reply
|
Chris
|
5/22/2007 12:23:59 PM
|
Header
|
Report
as Spam
|
|
Dmitriy Vyukov wrote: > On 21 =CD=C1=CA, 01:52, "Chris Thomasson" <cris...@comcast.net> wrote: >=20 >>Well, any thread can act as a multiplexer or demultiplexer. Any thread = can >>receive messages from any other thread. A thread needs to register with=
>>another thread for communication. So, when I create a thread, I use PDR= to >>walk a linked-list of active thread structures, and pushes message >>descriptors, onto itself and each thread. Now, the newly created thread= can >>multicast a single message to any other thread. Basically, you end up w= ith >>any thread being able to multicast to any other thread, using nothing b= ut >>atomic operation and membar free (e.g., except for #StoreStore) single >>producer-consumer queues. >=20 > So you create N^2 queues... > Don't you think that this is too costly? For example if I have 4 cores > and 4 threads, then I would have 16 queues. So.. this is acceptable. > But is common pattern to create threads more then cores. For example > if I have 32 threads, I would have 1000 queues and every thread would > have to multiplex 31 queues...
In this type of design, you would normally only have N^2 queues where N is the number of cores, not the number of threads. The scheduler for each core can distribute messages to its own threads without any addition= al synchronization (this assumes that the scheduler and the message passing implementation are cooperating, which probably requires that they are both implemented by the same library, language implementation, or OS).
The most complicated part of making such a design work well would be load=
balancing, not the messaging itself.
--=20 David Hopwood <david.hopwood@industrial-designers.co.uk>
|
|
0
|
|
|
Reply
|
David
|
5/22/2007 3:32:19 PM
|
Header
|
Report
as Spam
|
|
On May 22, 7:32 pm, David Hopwood <david.hopw...@industrial- designers.co.uk> wrote:
> In this type of design, you would normally only have N^2 queues where > N is the number of cores, not the number of threads. The scheduler for > each core can distribute messages to its own threads without any additional > synchronization (this assumes that the scheduler and the message passing > implementation are cooperating, which probably requires that they are > both implemented by the same library, language implementation, or OS).
It seems that you are saying about some specific situation... I don't want to require user of this solution to bind threads to cores. And I certainly don't want to make any invasion to language or OS core. So all I have is threads. Imho the fewer the assumptions, the more usable the solution. And wrt some particular application, normally I don't want to bind threads to cores too, because it prevents OS to make optimal dynamic scheduling (load-balancing is in one sense).
Dmitriy V'jukov
|
|
0
|
|
|
Reply
|
Dmitriy
|
5/22/2007 7:04:40 PM
|
Header
|
Report
as Spam
|
|
On May 22, 10:03 pm, "Chris Thomasson" <cris...@comcast.net> wrote: > [...] > > > void enqueue(void* value) > > { > > // here strong acquire of head > > // cached in tls, so atomic rmw issued > > // only once for nest > > local_ptr nest (head_); > > ^^^^^^^^^^ > > So, the local_ptr's in the example are all in per-thread structures whose > pointers are stored in TLS? For the lifetime of a thread? > > Or, do you have to acquire strong PDR reference 1 time per-call into > enqueue/dequeue? > > Please clarify... Thanks.
Here I imply something like this: http://groups.google.com/group/comp.programming.threads/msg/91d01886ae690185
I.e. I mean something like this:
void enqueue(local_mediator* m, void* value) { local_ptr nest (m, head_); // ... }
void producer_thread() { // ... local_mediator m; // <-- here cached acquired pointer while (true) { queue.enqueue(&m, value); } // ... }
So, here we acquire only weak reference, not strong.
But certanly we can store this cached pointer in tls. Then I imply something like:
void arbitrary_user_thread_func() { // here we can initialize all per-thread structures // and in destructor we will deinitialize and release all cached references my_library_thread_initializer initializer; // ... // here arbitrary user work // including calls to my shared queue // ... }
If it is "well-known" queue in the program, then we can place local_mediator object on stack of every thread that works with the queue. And this is the better variant. If it is not "well-known" queue in the program, then we must use tls.
And also I think of the next scheme: we can not use PDR here at all. Then nests will not be freed to system-allocator at all, only recycled. And then we must fix ABA some different way. We must fix ABA for enqueueing, dequeueing, moving of head pointer and moving of tail pointer. I fix some of this, but not all. Anyway algorithm becomes _very_ messy w/o PDR. So I don't think elemination of PDR is worthy of it...
Dmitriy V'jukov
|
|
0
|
|
|
Reply
|
Dmitriy
|
5/23/2007 6:57:50 AM
|
Header
|
Report
as Spam
|
|
27 Replies
236 Views
Similiar Articles:5/17/2012 8:12:26 PM
|