This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.06
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '3.06';
7 $VERSION = eval $VERSION;
8
9 use threads::shared 1.21;
10 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12 # Carp errors from threads::shared calls should complain about caller
13 our @CARP_NOT = ("threads::shared");
14
15 # Create a new queue possibly pre-populated with items
16 sub new
17 {
18     my $class = shift;
19     my @queue :shared = map { shared_clone($_) } @_;
20     my %self :shared = ( 'queue' => \@queue );
21     return bless(\%self, $class);
22 }
23
24 # Add items to the tail of a queue
25 sub enqueue
26 {
27     my $self = shift;
28     lock(%$self);
29
30     if ($$self{'ENDED'}) {
31         require Carp;
32         Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33     }
34
35     # Block if queue size exceeds any specified limit
36     my $queue = $$self{'queue'};
37     cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38
39     # Add items to queue, and then signal other threads
40     push(@$queue, map { shared_clone($_) } @_)
41         and cond_signal(%$self);
42 }
43
44 # Set or return the max. size for a queue
45 sub limit : lvalue
46 {
47     my $self = shift;
48     lock(%$self);
49     $$self{'LIMIT'};
50 }
51
52 # Return a count of the number of items on a queue
53 sub pending
54 {
55     my $self = shift;
56     lock(%$self);
57     return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
58     return scalar(@{$$self{'queue'}});
59 }
60
61 # Indicate that no more data will enter the queue
62 sub end
63 {
64     my $self = shift;
65     lock(%$self);
66     # No more data is coming
67     $$self{'ENDED'} = 1;
68     # Try to release at least one blocked thread
69     cond_signal(%$self);
70 }
71
72 # Return 1 or more items from the head of a queue, blocking if needed
73 sub dequeue
74 {
75     my $self = shift;
76     lock(%$self);
77     my $queue = $$self{'queue'};
78
79     my $count = @_ ? $self->_validate_count(shift) : 1;
80
81     # Wait for requisite number of items
82     cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83     cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
84
85     # If no longer blocking, try getting whatever is left on the queue
86     return $self->dequeue_nb($count) if ($$self{'ENDED'});
87
88     # Return single item
89     return shift(@$queue) if ($count == 1);
90
91     # Return multiple items
92     my @items;
93     push(@items, shift(@$queue)) for (1..$count);
94     return @items;
95 }
96
97 # Return items from the head of a queue with no blocking
98 sub dequeue_nb
99 {
100     my $self = shift;
101     lock(%$self);
102     my $queue = $$self{'queue'};
103
104     my $count = @_ ? $self->_validate_count(shift) : 1;
105
106     # Return single item
107     return shift(@$queue) if ($count == 1);
108
109     # Return multiple items
110     my @items;
111     for (1..$count) {
112         last if (! @$queue);
113         push(@items, shift(@$queue));
114     }
115     return @items;
116 }
117
118 # Return items from the head of a queue, blocking if needed up to a timeout
119 sub dequeue_timed
120 {
121     my $self = shift;
122     lock(%$self);
123     my $queue = $$self{'queue'};
124
125     # Timeout may be relative or absolute
126     my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
127     # Convert to an absolute time for use with cond_timedwait()
128     if ($timeout < 32000000) {   # More than one year
129         $timeout += time();
130     }
131
132     my $count = @_ ? $self->_validate_count(shift) : 1;
133
134     # Wait for requisite number of items, or until timeout
135     while ((@$queue < $count) && ! $$self{'ENDED'}) {
136         last if (! cond_timedwait(%$self, $timeout));
137     }
138     cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
139
140     # Get whatever we need off the queue if available
141     return $self->dequeue_nb($count);
142 }
143
144 # Return an item without removing it from a queue
145 sub peek
146 {
147     my $self = shift;
148     lock(%$self);
149     my $index = @_ ? $self->_validate_index(shift) : 0;
150     return $$self{'queue'}[$index];
151 }
152
153 # Insert items anywhere into a queue
154 sub insert
155 {
156     my $self = shift;
157     lock(%$self);
158
159     if ($$self{'ENDED'}) {
160         require Carp;
161         Carp::croak("'insert' method called on queue that has been 'end'ed");
162     }
163
164     my $queue = $$self{'queue'};
165
166     my $index = $self->_validate_index(shift);
167
168     return if (! @_);   # Nothing to insert
169
170     # Support negative indices
171     if ($index < 0) {
172         $index += @$queue;
173         if ($index < 0) {
174             $index = 0;
175         }
176     }
177
178     # Dequeue items from $index onward
179     my @tmp;
180     while (@$queue > $index) {
181         unshift(@tmp, pop(@$queue))
182     }
183
184     # Add new items to the queue
185     push(@$queue, map { shared_clone($_) } @_);
186
187     # Add previous items back onto the queue
188     push(@$queue, @tmp);
189
190     # Soup's up
191     cond_signal(%$self);
192 }
193
194 # Remove items from anywhere in a queue
195 sub extract
196 {
197     my $self = shift;
198     lock(%$self);
199     my $queue = $$self{'queue'};
200
201     my $index = @_ ? $self->_validate_index(shift) : 0;
202     my $count = @_ ? $self->_validate_count(shift) : 1;
203
204     # Support negative indices
205     if ($index < 0) {
206         $index += @$queue;
207         if ($index < 0) {
208             $count += $index;
209             return if ($count <= 0);            # Beyond the head of the queue
210             return $self->dequeue_nb($count);  # Extract from the head
211         }
212     }
213
214     # Dequeue items from $index+$count onward
215     my @tmp;
216     while (@$queue > ($index+$count)) {
217         unshift(@tmp, pop(@$queue))
218     }
219
220     # Extract desired items
221     my @items;
222     unshift(@items, pop(@$queue)) while (@$queue > $index);
223
224     # Add back any removed items
225     push(@$queue, @tmp);
226
227     # Return single item
228     return $items[0] if ($count == 1);
229
230     # Return multiple items
231     return @items;
232 }
233
234 ### Internal Methods ###
235
236 # Check value of the requested index
237 sub _validate_index
238 {
239     my $self = shift;
240     my $index = shift;
241
242     if (! defined($index) ||
243         ! looks_like_number($index) ||
244         (int($index) != $index))
245     {
246         require Carp;
247         my ($method) = (caller(1))[3];
248         my $class_name = ref($self);
249         $method =~ s/$class_name\:://;
250         $index = 'undef' if (! defined($index));
251         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
252     }
253
254     return $index;
255 };
256
257 # Check value of the requested count
258 sub _validate_count
259 {
260     my $self = shift;
261     my $count = shift;
262
263     if (! defined($count) ||
264         ! looks_like_number($count) ||
265         (int($count) != $count) ||
266         ($count < 1))
267     {
268         require Carp;
269         my ($method) = (caller(1))[3];
270         my $class_name = ref($self);
271         $method =~ s/$class_name\:://;
272         $count = 'undef' if (! defined($count));
273         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
274     }
275
276     return $count;
277 };
278
279 # Check value of the requested timeout
280 sub _validate_timeout
281 {
282     my $self = shift;
283     my $timeout = shift;
284
285     if (! defined($timeout) ||
286         ! looks_like_number($timeout))
287     {
288         require Carp;
289         my ($method) = (caller(1))[3];
290         my $class_name = ref($self);
291         $method =~ s/$class_name\:://;
292         $timeout = 'undef' if (! defined($timeout));
293         Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
294     }
295
296     return $timeout;
297 };
298
299 1;
300
301 =head1 NAME
302
303 Thread::Queue - Thread-safe queues
304
305 =head1 VERSION
306
307 This document describes Thread::Queue version 3.06
308
309 =head1 SYNOPSIS
310
311     use strict;
312     use warnings;
313
314     use threads;
315     use Thread::Queue;
316
317     my $q = Thread::Queue->new();    # A new empty queue
318
319     # Worker thread
320     my $thr = threads->create(
321         sub {
322             # Thread will loop until no more work
323             while (defined(my $item = $q->dequeue())) {
324                 # Do work on $item
325                 ...
326             }
327         }
328     );
329
330     # Send work to the thread
331     $q->enqueue($item1, ...);
332     # Signal that there is no more work to be sent
333     $q->end();
334     # Join up with the thread when it finishes
335     $thr->join();
336
337     ...
338
339     # Count of items in the queue
340     my $left = $q->pending();
341
342     # Non-blocking dequeue
343     if (defined(my $item = $q->dequeue_nb())) {
344         # Work on $item
345     }
346
347     # Blocking dequeue with 5-second timeout
348     if (defined(my $item = $q->dequeue_timed(5))) {
349         # Work on $item
350     }
351
352     # Set a size for a queue
353     $q->limit = 5;
354
355     # Get the second item in the queue without dequeuing anything
356     my $item = $q->peek(1);
357
358     # Insert two items into the queue just behind the head
359     $q->insert(1, $item1, $item2);
360
361     # Extract the last two items on the queue
362     my ($item1, $item2) = $q->extract(-2, 2);
363
364 =head1 DESCRIPTION
365
366 This module provides thread-safe FIFO queues that can be accessed safely by
367 any number of threads.
368
369 Any data types supported by L<threads::shared> can be passed via queues:
370
371 =over
372
373 =item Ordinary scalars
374
375 =item Array refs
376
377 =item Hash refs
378
379 =item Scalar refs
380
381 =item Objects based on the above
382
383 =back
384
385 Ordinary scalars are added to queues as they are.
386
387 If not already thread-shared, the other complex data types will be cloned
388 (recursively, if needed, and including any C<bless>ings and read-only
389 settings) into thread-shared structures before being placed onto a queue.
390
391 For example, the following would cause L<Thread::Queue> to create a empty,
392 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
393 and 'baz' from C<@ary> into it, and then place that shared reference onto
394 the queue:
395
396     my @ary = qw/foo bar baz/;
397     $q->enqueue(\@ary);
398
399 However, for the following, the items are already shared, so their references
400 are added directly to the queue, and no cloning takes place:
401
402     my @ary :shared = qw/foo bar baz/;
403     $q->enqueue(\@ary);
404
405     my $obj = &shared({});
406     $$obj{'foo'} = 'bar';
407     $$obj{'qux'} = 99;
408     bless($obj, 'My::Class');
409     $q->enqueue($obj);
410
411 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
412
413 =head1 QUEUE CREATION
414
415 =over
416
417 =item ->new()
418
419 Creates a new empty queue.
420
421 =item ->new(LIST)
422
423 Creates a new queue pre-populated with the provided list of items.
424
425 =back
426
427 =head1 BASIC METHODS
428
429 The following methods deal with queues on a FIFO basis.
430
431 =over
432
433 =item ->enqueue(LIST)
434
435 Adds a list of items onto the end of the queue.
436
437 =item ->dequeue()
438
439 =item ->dequeue(COUNT)
440
441 Removes the requested number of items (default is 1) from the head of the
442 queue, and returns them.  If the queue contains fewer than the requested
443 number of items, then the thread will be blocked until the requisite number
444 of items are available (i.e., until other threads C<enqueue> more items).
445
446 =item ->dequeue_nb()
447
448 =item ->dequeue_nb(COUNT)
449
450 Removes the requested number of items (default is 1) from the head of the
451 queue, and returns them.  If the queue contains fewer than the requested
452 number of items, then it immediately (i.e., non-blocking) returns whatever
453 items there are on the queue.  If the queue is empty, then C<undef> is
454 returned.
455
456 =item ->dequeue_timed(TIMEOUT)
457
458 =item ->dequeue_timed(TIMEOUT, COUNT)
459
460 Removes the requested number of items (default is 1) from the head of the
461 queue, and returns them.  If the queue contains fewer than the requested
462 number of items, then the thread will be blocked until the requisite number of
463 items are available, or until the timeout is reached.  If the timeout is
464 reached, it returns whatever items there are on the queue, or C<undef> if the
465 queue is empty.
466
467 The timeout may be a number of seconds relative to the current time (e.g., 5
468 seconds from when the call is made), or may be an absolute timeout in I<epoch>
469 seconds the same as would be used with
470 L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
471 Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
472 the underlying implementation).
473
474 If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
475 behaves the same as C<dequeue_nb>.
476
477 =item ->pending()
478
479 Returns the number of items still in the queue.  Returns C<undef> if the queue
480 has been ended (see below), and there are no more items in the queue.
481
482 =item ->limit
483
484 Sets the size of the queue.  If set, calls to C<enqueue()> will block until
485 the number of pending items in the queue drops below the C<limit>.  The
486 C<limit> does not prevent enqueuing items beyond that count:
487
488     my $q = Thread::Queue->new(1, 2);
489     $q->limit = 4;
490     $q->enqueue(3, 4, 5);   # Does not block
491     $q->enqueue(6);         # Blocks until at least 2 items are dequeued
492
493     my $size = $q->limit;   # Returns the current limit (may return 'undef')
494     $q->limit = 0;          # Queue size is now unlimited
495
496 =item ->end()
497
498 Declares that no more items will be added to the queue.
499
500 All threads blocking on C<dequeue()> calls will be unblocked with any
501 remaining items in the queue and/or C<undef> being returned.  Any subsequent
502 calls to C<dequeue()> will behave like C<dequeue_nb()>.
503
504 Once ended, no more items may be placed in the queue.
505
506 =back
507
508 =head1 ADVANCED METHODS
509
510 The following methods can be used to manipulate items anywhere in a queue.
511
512 To prevent the contents of a queue from being modified by another thread
513 while it is being examined and/or changed, L<lock|threads::shared/"lock
514 VARIABLE"> the queue inside a local block:
515
516     {
517         lock($q);   # Keep other threads from changing the queue's contents
518         my $item = $q->peek();
519         if ($item ...) {
520             ...
521         }
522     }
523     # Queue is now unlocked
524
525 =over
526
527 =item ->peek()
528
529 =item ->peek(INDEX)
530
531 Returns an item from the queue without dequeuing anything.  Defaults to the
532 the head of queue (at index position 0) if no index is specified.  Negative
533 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
534 is the end of the queue, -2 is next to last, and so on).
535
536 If no items exists at the specified index (i.e., the queue is empty, or the
537 index is beyond the number of items on the queue), then C<undef> is returned.
538
539 Remember, the returned item is not removed from the queue, so manipulating a
540 C<peek>ed at reference affects the item on the queue.
541
542 =item ->insert(INDEX, LIST)
543
544 Adds the list of items to the queue at the specified index position (0
545 is the head of the list).  Any existing items at and beyond that position are
546 pushed back past the newly added items:
547
548     $q->enqueue(1, 2, 3, 4);
549     $q->insert(1, qw/foo bar/);
550     # Queue now contains:  1, foo, bar, 2, 3, 4
551
552 Specifying an index position greater than the number of items in the queue
553 just adds the list to the end.
554
555 Negative index positions are supported:
556
557     $q->enqueue(1, 2, 3, 4);
558     $q->insert(-2, qw/foo bar/);
559     # Queue now contains:  1, 2, foo, bar, 3, 4
560
561 Specifying a negative index position greater than the number of items in the
562 queue adds the list to the head of the queue.
563
564 =item ->extract()
565
566 =item ->extract(INDEX)
567
568 =item ->extract(INDEX, COUNT)
569
570 Removes and returns the specified number of items (defaults to 1) from the
571 specified index position in the queue (0 is the head of the queue).  When
572 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
573
574 This method is non-blocking, and will return only as many items as are
575 available to fulfill the request:
576
577     $q->enqueue(1, 2, 3, 4);
578     my $item  = $q->extract(2)     # Returns 3
579                                    # Queue now contains:  1, 2, 4
580     my @items = $q->extract(1, 3)  # Returns (2, 4)
581                                    # Queue now contains:  1
582
583 Specifying an index position greater than the number of items in the
584 queue results in C<undef> or an empty list being returned.
585
586     $q->enqueue('foo');
587     my $nada = $q->extract(3)      # Returns undef
588     my @nada = $q->extract(1, 3)   # Returns ()
589
590 Negative index positions are supported.  Specifying a negative index position
591 greater than the number of items in the queue may return items from the head
592 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
593 queue from the specified position (i.e. if queue size + index + count is
594 greater than zero):
595
596     $q->enqueue(qw/foo bar baz/);
597     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
598     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
599                                      # Queue now contains:  bar, baz
600     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
601
602 =back
603
604 =head1 NOTES
605
606 Queues created by L<Thread::Queue> can be used in both threaded and
607 non-threaded applications.
608
609 =head1 LIMITATIONS
610
611 Passing objects on queues may not work if the objects' classes do not support
612 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
613
614 Passing array/hash refs that contain objects may not work for Perl prior to
615 5.10.0.
616
617 =head1 SEE ALSO
618
619 Thread::Queue Discussion Forum on CPAN:
620 L<http://www.cpanforum.com/dist/Thread-Queue>
621
622 L<threads>, L<threads::shared>
623
624 Sample code in the I<examples> directory of this distribution on CPAN.
625
626 =head1 MAINTAINER
627
628 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
629
630 =head1 LICENSE
631
632 This program is free software; you can redistribute it and/or modify it under
633 the same terms as Perl itself.
634
635 =cut