Thread::Queue 2.09
[perl.git] / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '2.09';
7
8 use threads::shared 1.21;
9 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
10
11 # Carp errors from threads::shared calls should complain about caller
12 our @CARP_NOT = ("threads::shared");
13
14 # Predeclarations for internal functions
15 my ($validate_count, $validate_index);
16
17 # Create a new queue possibly pre-populated with items
18 sub new
19 {
20     my $class = shift;
21     my @queue :shared = map { shared_clone($_) } @_;
22     return bless(\@queue, $class);
23 }
24
25 # Add items to the tail of a queue
26 sub enqueue
27 {
28     my $queue = shift;
29     lock(@$queue);
30     push(@$queue, map { shared_clone($_) } @_)
31         and cond_signal(@$queue);
32 }
33
34 # Return a count of the number of items on a queue
35 sub pending
36 {
37     my $queue = shift;
38     lock(@$queue);
39     return scalar(@$queue);
40 }
41
42 # Return 1 or more items from the head of a queue, blocking if needed
43 sub dequeue
44 {
45     my $queue = shift;
46     lock(@$queue);
47
48     my $count = @_ ? $validate_count->(shift) : 1;
49
50     # Wait for requisite number of items
51     cond_wait(@$queue) until (@$queue >= $count);
52     cond_signal(@$queue) if (@$queue > $count);
53
54     # Return single item
55     return shift(@$queue) if ($count == 1);
56
57     # Return multiple items
58     my @items;
59     push(@items, shift(@$queue)) for (1..$count);
60     return @items;
61 }
62
63 # Return items from the head of a queue with no blocking
64 sub dequeue_nb
65 {
66     my $queue = shift;
67     lock(@$queue);
68
69     my $count = @_ ? $validate_count->(shift) : 1;
70
71     # Return single item
72     return shift(@$queue) if ($count == 1);
73
74     # Return multiple items
75     my @items;
76     for (1..$count) {
77         last if (! @$queue);
78         push(@items, shift(@$queue));
79     }
80     return @items;
81 }
82
83 # Return an item without removing it from a queue
84 sub peek
85 {
86     my $queue = shift;
87     lock(@$queue);
88     my $index = @_ ? $validate_index->(shift) : 0;
89     return $$queue[$index];
90 }
91
92 # Insert items anywhere into a queue
93 sub insert
94 {
95     my $queue = shift;
96     lock(@$queue);
97
98     my $index = $validate_index->(shift);
99
100     return if (! @_);   # Nothing to insert
101
102     # Support negative indices
103     if ($index < 0) {
104         $index += @$queue;
105         if ($index < 0) {
106             $index = 0;
107         }
108     }
109
110     # Dequeue items from $index onward
111     my @tmp;
112     while (@$queue > $index) {
113         unshift(@tmp, pop(@$queue))
114     }
115
116     # Add new items to the queue
117     push(@$queue, map { shared_clone($_) } @_);
118
119     # Add previous items back onto the queue
120     push(@$queue, @tmp);
121
122     # Soup's up
123     cond_signal(@$queue);
124 }
125
126 # Remove items from anywhere in a queue
127 sub extract
128 {
129     my $queue = shift;
130     lock(@$queue);
131
132     my $index = @_ ? $validate_index->(shift) : 0;
133     my $count = @_ ? $validate_count->(shift) : 1;
134
135     # Support negative indices
136     if ($index < 0) {
137         $index += @$queue;
138         if ($index < 0) {
139             $count += $index;
140             return if ($count <= 0);            # Beyond the head of the queue
141             return $queue->dequeue_nb($count);  # Extract from the head
142         }
143     }
144
145     # Dequeue items from $index+$count onward
146     my @tmp;
147     while (@$queue > ($index+$count)) {
148         unshift(@tmp, pop(@$queue))
149     }
150
151     # Extract desired items
152     my @items;
153     unshift(@items, pop(@$queue)) while (@$queue > $index);
154
155     # Add back any removed items
156     push(@$queue, @tmp);
157
158     # Return single item
159     return $items[0] if ($count == 1);
160
161     # Return multiple items
162     return @items;
163 }
164
165 ### Internal Functions ###
166
167 # Check value of the requested index
168 $validate_index = sub {
169     my $index = shift;
170
171     if (! defined($index) ||
172         ! looks_like_number($index) ||
173         (int($index) != $index))
174     {
175         require Carp;
176         my ($method) = (caller(1))[3];
177         $method =~ s/Thread::Queue:://;
178         $index = 'undef' if (! defined($index));
179         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
180     }
181
182     return $index;
183 };
184
185 # Check value of the requested count
186 $validate_count = sub {
187     my $count = shift;
188
189     if (! defined($count) ||
190         ! looks_like_number($count) ||
191         (int($count) != $count) ||
192         ($count < 1))
193     {
194         require Carp;
195         my ($method) = (caller(1))[3];
196         $method =~ s/Thread::Queue:://;
197         $count = 'undef' if (! defined($count));
198         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
199     }
200
201     return $count;
202 };
203
204 1;
205
206 =head1 NAME
207
208 Thread::Queue - Thread-safe queues
209
210 =head1 VERSION
211
212 This document describes Thread::Queue version 2.09
213
214 =head1 SYNOPSIS
215
216     use strict;
217     use warnings;
218
219     use threads;
220     use Thread::Queue;
221
222     my $q = Thread::Queue->new();    # A new empty queue
223
224     # Worker thread
225     my $thr = threads->create(sub {
226                                 while (my $item = $q->dequeue()) {
227                                     # Do work on $item
228                                 }
229                              })->detach();
230
231     # Send work to the thread
232     $q->enqueue($item1, ...);
233
234
235     # Count of items in the queue
236     my $left = $q->pending();
237
238     # Non-blocking dequeue
239     if (defined(my $item = $q->dequeue_nb())) {
240         # Work on $item
241     }
242
243     # Get the second item in the queue without dequeuing anything
244     my $item = $q->peek(1);
245
246     # Insert two items into the queue just behind the head
247     $q->insert(1, $item1, $item2);
248
249     # Extract the last two items on the queue
250     my ($item1, $item2) = $q->extract(-2, 2);
251
252 =head1 DESCRIPTION
253
254 This module provides thread-safe FIFO queues that can be accessed safely by
255 any number of threads.
256
257 Any data types supported by L<threads::shared> can be passed via queues:
258
259 =over
260
261 =item Ordinary scalars
262
263 =item Array refs
264
265 =item Hash refs
266
267 =item Scalar refs
268
269 =item Objects based on the above
270
271 =back
272
273 Ordinary scalars are added to queues as they are.
274
275 If not already thread-shared, the other complex data types will be cloned
276 (recursively, if needed, and including any C<bless>ings and read-only
277 settings) into thread-shared structures before being placed onto a queue.
278
279 For example, the following would cause L<Thread::Queue> to create a empty,
280 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
281 and 'baz' from C<@ary> into it, and then place that shared reference onto
282 the queue:
283
284     my @ary = qw/foo bar baz/;
285     $q->enqueue(\@ary);
286
287 However, for the following, the items are already shared, so their references
288 are added directly to the queue, and no cloning takes place:
289
290     my @ary :shared = qw/foo bar baz/;
291     $q->enqueue(\@ary);
292
293     my $obj = &shared({});
294     $$obj{'foo'} = 'bar';
295     $$obj{'qux'} = 99;
296     bless($obj, 'My::Class');
297     $q->enqueue($obj);
298
299 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
300
301 =head1 QUEUE CREATION
302
303 =over
304
305 =item ->new()
306
307 Creates a new empty queue.
308
309 =item ->new(LIST)
310
311 Creates a new queue pre-populated with the provided list of items.
312
313 =back
314
315 =head1 BASIC METHODS
316
317 The following methods deal with queues on a FIFO basis.
318
319 =over
320
321 =item ->enqueue(LIST)
322
323 Adds a list of items onto the end of the queue.
324
325 =item ->dequeue()
326
327 =item ->dequeue(COUNT)
328
329 Removes the requested number of items (default is 1) from the head of the
330 queue, and returns them.  If the queue contains fewer than the requested
331 number of items, then the thread will be blocked until the requisite number
332 of items are available (i.e., until other threads <enqueue> more items).
333
334 =item ->dequeue_nb()
335
336 =item ->dequeue_nb(COUNT)
337
338 Removes the requested number of items (default is 1) from the head of the
339 queue, and returns them.  If the queue contains fewer than the requested
340 number of items, then it immediately (i.e., non-blocking) returns whatever
341 items there are on the queue.  If the queue is empty, then C<undef> is
342 returned.
343
344 =item ->pending()
345
346 Returns the number of items still in the queue.
347
348 =back
349
350 =head1 ADVANCED METHODS
351
352 The following methods can be used to manipulate items anywhere in a queue.
353
354 To prevent the contents of a queue from being modified by another thread
355 while it is being examined and/or changed, L<lock|threads::shared/"lock
356 VARIABLE"> the queue inside a local block:
357
358     {
359         lock($q);   # Keep other threads from changing the queue's contents
360         my $item = $q->peek();
361         if ($item ...) {
362             ...
363         }
364     }
365     # Queue is now unlocked
366
367 =over
368
369 =item ->peek()
370
371 =item ->peek(INDEX)
372
373 Returns an item from the queue without dequeuing anything.  Defaults to the
374 the head of queue (at index position 0) if no index is specified.  Negative
375 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
376 is the end of the queue, -2 is next to last, and so on).
377
378 If no items exists at the specified index (i.e., the queue is empty, or the
379 index is beyond the number of items on the queue), then C<undef> is returned.
380
381 Remember, the returned item is not removed from the queue, so manipulating a
382 C<peek>ed at reference affects the item on the queue.
383
384 =item ->insert(INDEX, LIST)
385
386 Adds the list of items to the queue at the specified index position (0
387 is the head of the list).  Any existing items at and beyond that position are
388 pushed back past the newly added items:
389
390     $q->enqueue(1, 2, 3, 4);
391     $q->insert(1, qw/foo bar/);
392     # Queue now contains:  1, foo, bar, 2, 3, 4
393
394 Specifying an index position greater than the number of items in the queue
395 just adds the list to the end.
396
397 Negative index positions are supported:
398
399     $q->enqueue(1, 2, 3, 4);
400     $q->insert(-2, qw/foo bar/);
401     # Queue now contains:  1, 2, foo, bar, 3, 4
402
403 Specifying a negative index position greater than the number of items in the
404 queue adds the list to the head of the queue.
405
406 =item ->extract()
407
408 =item ->extract(INDEX)
409
410 =item ->extract(INDEX, COUNT)
411
412 Removes and returns the specified number of items (defaults to 1) from the
413 specified index position in the queue (0 is the head of the queue).  When
414 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
415
416 This method is non-blocking, and will return only as many items as are
417 available to fulfill the request:
418
419     $q->enqueue(1, 2, 3, 4);
420     my $item  = $q->extract(2)     # Returns 3
421                                    # Queue now contains:  1, 2, 4
422     my @items = $q->extract(1, 3)  # Returns (2, 4)
423                                    # Queue now contains:  1
424
425 Specifying an index position greater than the number of items in the
426 queue results in C<undef> or an empty list being returned.
427
428     $q->enqueue('foo');
429     my $nada = $q->extract(3)      # Returns undef
430     my @nada = $q->extract(1, 3)   # Returns ()
431
432 Negative index positions are supported.  Specifying a negative index position
433 greater than the number of items in the queue may return items from the head
434 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
435 queue from the specified position (i.e. if queue size + index + count is
436 greater than zero):
437
438     $q->enqueue(qw/foo bar baz/);
439     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
440     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
441                                      # Queue now contains:  bar, baz
442     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
443
444 =back
445
446 =head1 NOTES
447
448 Queues created by L<Thread::Queue> can be used in both threaded and
449 non-threaded applications.
450
451 =head1 LIMITATIONS
452
453 Passing objects on queues may not work if the objects' classes do not support
454 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
455
456 Passing array/hash refs that contain objects may not work for Perl prior to
457 5.10.0.
458
459 =head1 SEE ALSO
460
461 Thread::Queue Discussion Forum on CPAN:
462 L<http://www.cpanforum.com/dist/Thread-Queue>
463
464 Annotated POD for Thread::Queue:
465 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.09/lib/Thread/Queue.pm>
466
467 Source repository:
468 L<http://code.google.com/p/thread-queue/>
469
470 L<threads>, L<threads::shared>
471
472 =head1 MAINTAINER
473
474 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
475
476 =head1 LICENSE
477
478 This program is free software; you can redistribute it and/or modify it under
479 the same terms as Perl itself.
480
481 =cut