This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.01
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '3.01';
7 $VERSION = eval $VERSION;
8
9 use threads::shared 1.21;
10 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12 # Carp errors from threads::shared calls should complain about caller
13 our @CARP_NOT = ("threads::shared");
14
15 # Predeclarations for internal functions
16 my ($validate_count, $validate_index);
17
18 # Create a new queue possibly pre-populated with items
19 sub new
20 {
21     my $class = shift;
22     my @queue :shared = map { shared_clone($_) } @_;
23     my %self :shared = ( 'queue' => \@queue );
24     return bless(\%self, $class);
25 }
26
27 # Add items to the tail of a queue
28 sub enqueue
29 {
30     my $self = shift;
31     lock(%$self);
32     if ($$self{'ENDED'}) {
33         require Carp;
34         Carp::croak("'enqueue' method called on queue that has been 'end'ed");
35     }
36     push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
37         and cond_signal(%$self);
38 }
39
40 # Return a count of the number of items on a queue
41 sub pending
42 {
43     my $self = shift;
44     lock(%$self);
45     return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
46     return scalar(@{$$self{'queue'}});
47 }
48
49 # Indicate that no more data will enter the queue
50 sub end
51 {
52     my $self = shift;
53     lock $self;
54     # No more data is coming
55     $$self{'ENDED'} = 1;
56     # Try to release at least one blocked thread
57     cond_signal(%$self);
58 }
59
60 # Return 1 or more items from the head of a queue, blocking if needed
61 sub dequeue
62 {
63     my $self = shift;
64     lock(%$self);
65     my $queue = $$self{'queue'};
66
67     my $count = @_ ? $validate_count->(shift) : 1;
68
69     # Wait for requisite number of items
70     cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
71     cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
72
73     # If no longer blocking, try getting whatever is left on the queue
74     return $self->dequeue_nb($count) if ($$self{'ENDED'});
75
76     # Return single item
77     return shift(@$queue) if ($count == 1);
78
79     # Return multiple items
80     my @items;
81     push(@items, shift(@$queue)) for (1..$count);
82     return @items;
83 }
84
85 # Return items from the head of a queue with no blocking
86 sub dequeue_nb
87 {
88     my $self = shift;
89     lock(%$self);
90     my $queue = $$self{'queue'};
91
92     my $count = @_ ? $validate_count->(shift) : 1;
93
94     # Return single item
95     return shift(@$queue) if ($count == 1);
96
97     # Return multiple items
98     my @items;
99     for (1..$count) {
100         last if (! @$queue);
101         push(@items, shift(@$queue));
102     }
103     return @items;
104 }
105
106 # Return an item without removing it from a queue
107 sub peek
108 {
109     my $self = shift;
110     lock(%$self);
111     my $index = @_ ? $validate_index->(shift) : 0;
112     return $$self{'queue'}[$index];
113 }
114
115 # Insert items anywhere into a queue
116 sub insert
117 {
118     my $self = shift;
119     lock(%$self);
120
121     if ($$self{'ENDED'}) {
122         require Carp;
123         Carp::croak("'insert' method called on queue that has been 'end'ed");
124     }
125
126     my $queue = $$self{'queue'};
127
128     my $index = $validate_index->(shift);
129
130     return if (! @_);   # Nothing to insert
131
132     # Support negative indices
133     if ($index < 0) {
134         $index += @$queue;
135         if ($index < 0) {
136             $index = 0;
137         }
138     }
139
140     # Dequeue items from $index onward
141     my @tmp;
142     while (@$queue > $index) {
143         unshift(@tmp, pop(@$queue))
144     }
145
146     # Add new items to the queue
147     push(@$queue, map { shared_clone($_) } @_);
148
149     # Add previous items back onto the queue
150     push(@$queue, @tmp);
151
152     # Soup's up
153     cond_signal(%$self);
154 }
155
156 # Remove items from anywhere in a queue
157 sub extract
158 {
159     my $self = shift;
160     lock(%$self);
161     my $queue = $$self{'queue'};
162
163     my $index = @_ ? $validate_index->(shift) : 0;
164     my $count = @_ ? $validate_count->(shift) : 1;
165
166     # Support negative indices
167     if ($index < 0) {
168         $index += @$queue;
169         if ($index < 0) {
170             $count += $index;
171             return if ($count <= 0);            # Beyond the head of the queue
172             return $self->dequeue_nb($count);  # Extract from the head
173         }
174     }
175
176     # Dequeue items from $index+$count onward
177     my @tmp;
178     while (@$queue > ($index+$count)) {
179         unshift(@tmp, pop(@$queue))
180     }
181
182     # Extract desired items
183     my @items;
184     unshift(@items, pop(@$queue)) while (@$queue > $index);
185
186     # Add back any removed items
187     push(@$queue, @tmp);
188
189     # Return single item
190     return $items[0] if ($count == 1);
191
192     # Return multiple items
193     return @items;
194 }
195
196 ### Internal Functions ###
197
198 # Check value of the requested index
199 $validate_index = sub {
200     my $index = shift;
201
202     if (! defined($index) ||
203         ! looks_like_number($index) ||
204         (int($index) != $index))
205     {
206         require Carp;
207         my ($method) = (caller(1))[3];
208         $method =~ s/Thread::Queue:://;
209         $index = 'undef' if (! defined($index));
210         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
211     }
212
213     return $index;
214 };
215
216 # Check value of the requested count
217 $validate_count = sub {
218     my $count = shift;
219
220     if (! defined($count) ||
221         ! looks_like_number($count) ||
222         (int($count) != $count) ||
223         ($count < 1))
224     {
225         require Carp;
226         my ($method) = (caller(1))[3];
227         $method =~ s/Thread::Queue:://;
228         $count = 'undef' if (! defined($count));
229         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
230     }
231
232     return $count;
233 };
234
235 1;
236
237 =head1 NAME
238
239 Thread::Queue - Thread-safe queues
240
241 =head1 VERSION
242
243 This document describes Thread::Queue version 3.01
244
245 =head1 SYNOPSIS
246
247     use strict;
248     use warnings;
249
250     use threads;
251     use Thread::Queue;
252
253     my $q = Thread::Queue->new();    # A new empty queue
254
255     # Worker thread
256     my $thr = threads->create(
257         sub {
258             # Thread will loop until no more work
259             while (defined(my $item = $q->dequeue())) {
260                 # Do work on $item
261                 ...
262             }
263         }
264     );
265
266     # Send work to the thread
267     $q->enqueue($item1, ...);
268     # Signal that there is no more work to be sent
269     $q->end();
270     # Join up with the thread when it finishes
271     $thr->join();
272
273     ...
274
275     # Count of items in the queue
276     my $left = $q->pending();
277
278     # Non-blocking dequeue
279     if (defined(my $item = $q->dequeue_nb())) {
280         # Work on $item
281     }
282
283     # Get the second item in the queue without dequeuing anything
284     my $item = $q->peek(1);
285
286     # Insert two items into the queue just behind the head
287     $q->insert(1, $item1, $item2);
288
289     # Extract the last two items on the queue
290     my ($item1, $item2) = $q->extract(-2, 2);
291
292 =head1 DESCRIPTION
293
294 This module provides thread-safe FIFO queues that can be accessed safely by
295 any number of threads.
296
297 Any data types supported by L<threads::shared> can be passed via queues:
298
299 =over
300
301 =item Ordinary scalars
302
303 =item Array refs
304
305 =item Hash refs
306
307 =item Scalar refs
308
309 =item Objects based on the above
310
311 =back
312
313 Ordinary scalars are added to queues as they are.
314
315 If not already thread-shared, the other complex data types will be cloned
316 (recursively, if needed, and including any C<bless>ings and read-only
317 settings) into thread-shared structures before being placed onto a queue.
318
319 For example, the following would cause L<Thread::Queue> to create a empty,
320 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
321 and 'baz' from C<@ary> into it, and then place that shared reference onto
322 the queue:
323
324     my @ary = qw/foo bar baz/;
325     $q->enqueue(\@ary);
326
327 However, for the following, the items are already shared, so their references
328 are added directly to the queue, and no cloning takes place:
329
330     my @ary :shared = qw/foo bar baz/;
331     $q->enqueue(\@ary);
332
333     my $obj = &shared({});
334     $$obj{'foo'} = 'bar';
335     $$obj{'qux'} = 99;
336     bless($obj, 'My::Class');
337     $q->enqueue($obj);
338
339 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
340
341 =head1 QUEUE CREATION
342
343 =over
344
345 =item ->new()
346
347 Creates a new empty queue.
348
349 =item ->new(LIST)
350
351 Creates a new queue pre-populated with the provided list of items.
352
353 =back
354
355 =head1 BASIC METHODS
356
357 The following methods deal with queues on a FIFO basis.
358
359 =over
360
361 =item ->enqueue(LIST)
362
363 Adds a list of items onto the end of the queue.
364
365 =item ->dequeue()
366
367 =item ->dequeue(COUNT)
368
369 Removes the requested number of items (default is 1) from the head of the
370 queue, and returns them.  If the queue contains fewer than the requested
371 number of items, then the thread will be blocked until the requisite number
372 of items are available (i.e., until other threads <enqueue> more items).
373
374 =item ->dequeue_nb()
375
376 =item ->dequeue_nb(COUNT)
377
378 Removes the requested number of items (default is 1) from the head of the
379 queue, and returns them.  If the queue contains fewer than the requested
380 number of items, then it immediately (i.e., non-blocking) returns whatever
381 items there are on the queue.  If the queue is empty, then C<undef> is
382 returned.
383
384 =item ->pending()
385
386 Returns the number of items still in the queue.  Returns C<undef> if the queue
387 has been ended (see below), and there are no more items in the queue.
388
389 =item ->end()
390
391 Declares that no more items will be added to the queue.
392
393 All threads blocking on C<dequeue()> calls will be unblocked with any
394 remaining items in the queue and/or C<undef> being returned.  Any subsequent
395 calls to C<dequeue()> will behave like C<dequeue_nb()>.
396
397 Once ended, no more items may be placed in the queue.
398
399 =back
400
401 =head1 ADVANCED METHODS
402
403 The following methods can be used to manipulate items anywhere in a queue.
404
405 To prevent the contents of a queue from being modified by another thread
406 while it is being examined and/or changed, L<lock|threads::shared/"lock
407 VARIABLE"> the queue inside a local block:
408
409     {
410         lock($q);   # Keep other threads from changing the queue's contents
411         my $item = $q->peek();
412         if ($item ...) {
413             ...
414         }
415     }
416     # Queue is now unlocked
417
418 =over
419
420 =item ->peek()
421
422 =item ->peek(INDEX)
423
424 Returns an item from the queue without dequeuing anything.  Defaults to the
425 the head of queue (at index position 0) if no index is specified.  Negative
426 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
427 is the end of the queue, -2 is next to last, and so on).
428
429 If no items exists at the specified index (i.e., the queue is empty, or the
430 index is beyond the number of items on the queue), then C<undef> is returned.
431
432 Remember, the returned item is not removed from the queue, so manipulating a
433 C<peek>ed at reference affects the item on the queue.
434
435 =item ->insert(INDEX, LIST)
436
437 Adds the list of items to the queue at the specified index position (0
438 is the head of the list).  Any existing items at and beyond that position are
439 pushed back past the newly added items:
440
441     $q->enqueue(1, 2, 3, 4);
442     $q->insert(1, qw/foo bar/);
443     # Queue now contains:  1, foo, bar, 2, 3, 4
444
445 Specifying an index position greater than the number of items in the queue
446 just adds the list to the end.
447
448 Negative index positions are supported:
449
450     $q->enqueue(1, 2, 3, 4);
451     $q->insert(-2, qw/foo bar/);
452     # Queue now contains:  1, 2, foo, bar, 3, 4
453
454 Specifying a negative index position greater than the number of items in the
455 queue adds the list to the head of the queue.
456
457 =item ->extract()
458
459 =item ->extract(INDEX)
460
461 =item ->extract(INDEX, COUNT)
462
463 Removes and returns the specified number of items (defaults to 1) from the
464 specified index position in the queue (0 is the head of the queue).  When
465 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
466
467 This method is non-blocking, and will return only as many items as are
468 available to fulfill the request:
469
470     $q->enqueue(1, 2, 3, 4);
471     my $item  = $q->extract(2)     # Returns 3
472                                    # Queue now contains:  1, 2, 4
473     my @items = $q->extract(1, 3)  # Returns (2, 4)
474                                    # Queue now contains:  1
475
476 Specifying an index position greater than the number of items in the
477 queue results in C<undef> or an empty list being returned.
478
479     $q->enqueue('foo');
480     my $nada = $q->extract(3)      # Returns undef
481     my @nada = $q->extract(1, 3)   # Returns ()
482
483 Negative index positions are supported.  Specifying a negative index position
484 greater than the number of items in the queue may return items from the head
485 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
486 queue from the specified position (i.e. if queue size + index + count is
487 greater than zero):
488
489     $q->enqueue(qw/foo bar baz/);
490     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
491     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
492                                      # Queue now contains:  bar, baz
493     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
494
495 =back
496
497 =head1 NOTES
498
499 Queues created by L<Thread::Queue> can be used in both threaded and
500 non-threaded applications.
501
502 =head1 LIMITATIONS
503
504 Passing objects on queues may not work if the objects' classes do not support
505 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
506
507 Passing array/hash refs that contain objects may not work for Perl prior to
508 5.10.0.
509
510 =head1 SEE ALSO
511
512 Thread::Queue Discussion Forum on CPAN:
513 L<http://www.cpanforum.com/dist/Thread-Queue>
514
515 L<threads>, L<threads::shared>
516
517 Sample code in the I<examples> directory of this distribution on CPAN.
518
519 =head1 MAINTAINER
520
521 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
522
523 =head1 LICENSE
524
525 This program is free software; you can redistribute it and/or modify it under
526 the same terms as Perl itself.
527
528 =cut