7 $VERSION = eval $VERSION;
9 use threads::shared 1.21;
10 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
12 # Carp errors from threads::shared calls should complain about caller
13 our @CARP_NOT = ("threads::shared");
15 # Create a new queue possibly pre-populated with items
19 my @queue :shared = map { shared_clone($_) } @_;
20 my %self :shared = ( 'queue' => \@queue );
21 return bless(\%self, $class);
24 # Add items to the tail of a queue
29 if ($$self{'ENDED'}) {
31 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33 push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
34 and cond_signal(%$self);
37 # Return a count of the number of items on a queue
42 return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
43 return scalar(@{$$self{'queue'}});
46 # Indicate that no more data will enter the queue
51 # No more data is coming
53 # Try to release at least one blocked thread
57 # Return 1 or more items from the head of a queue, blocking if needed
62 my $queue = $$self{'queue'};
64 my $count = @_ ? $self->_validate_count(shift) : 1;
66 # Wait for requisite number of items
67 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
68 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
70 # If no longer blocking, try getting whatever is left on the queue
71 return $self->dequeue_nb($count) if ($$self{'ENDED'});
74 return shift(@$queue) if ($count == 1);
76 # Return multiple items
78 push(@items, shift(@$queue)) for (1..$count);
82 # Return items from the head of a queue with no blocking
87 my $queue = $$self{'queue'};
89 my $count = @_ ? $self->_validate_count(shift) : 1;
92 return shift(@$queue) if ($count == 1);
94 # Return multiple items
98 push(@items, shift(@$queue));
103 # Return items from the head of a queue, blocking if needed up to a timeout
108 my $queue = $$self{'queue'};
110 # Timeout may be relative or absolute
111 my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
112 # Convert to an absolute time for use with cond_timedwait()
113 if ($timeout < 32000000) { # More than one year
117 my $count = @_ ? $self->_validate_count(shift) : 1;
119 # Wait for requisite number of items, or until timeout
120 while ((@$queue < $count) && ! $$self{'ENDED'}) {
121 last if (! cond_timedwait(%$self, $timeout));
123 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
125 # Get whatever we need off the queue if available
126 return $self->dequeue_nb($count);
129 # Return an item without removing it from a queue
134 my $index = @_ ? $self->_validate_index(shift) : 0;
135 return $$self{'queue'}[$index];
138 # Insert items anywhere into a queue
144 if ($$self{'ENDED'}) {
146 Carp::croak("'insert' method called on queue that has been 'end'ed");
149 my $queue = $$self{'queue'};
151 my $index = $self->_validate_index(shift);
153 return if (! @_); # Nothing to insert
155 # Support negative indices
163 # Dequeue items from $index onward
165 while (@$queue > $index) {
166 unshift(@tmp, pop(@$queue))
169 # Add new items to the queue
170 push(@$queue, map { shared_clone($_) } @_);
172 # Add previous items back onto the queue
179 # Remove items from anywhere in a queue
184 my $queue = $$self{'queue'};
186 my $index = @_ ? $self->_validate_index(shift) : 0;
187 my $count = @_ ? $self->_validate_count(shift) : 1;
189 # Support negative indices
194 return if ($count <= 0); # Beyond the head of the queue
195 return $self->dequeue_nb($count); # Extract from the head
199 # Dequeue items from $index+$count onward
201 while (@$queue > ($index+$count)) {
202 unshift(@tmp, pop(@$queue))
205 # Extract desired items
207 unshift(@items, pop(@$queue)) while (@$queue > $index);
209 # Add back any removed items
213 return $items[0] if ($count == 1);
215 # Return multiple items
219 ### Internal Methods ###
221 # Check value of the requested index
227 if (! defined($index) ||
228 ! looks_like_number($index) ||
229 (int($index) != $index))
232 my ($method) = (caller(1))[3];
233 my $class_name = ref($self);
234 $method =~ s/$class_name\:://;
235 $index = 'undef' if (! defined($index));
236 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
242 # Check value of the requested count
248 if (! defined($count) ||
249 ! looks_like_number($count) ||
250 (int($count) != $count) ||
254 my ($method) = (caller(1))[3];
255 my $class_name = ref($self);
256 $method =~ s/$class_name\:://;
257 $count = 'undef' if (! defined($count));
258 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
264 # Check value of the requested timeout
265 sub _validate_timeout
270 if (! defined($timeout) ||
271 ! looks_like_number($timeout))
274 my ($method) = (caller(1))[3];
275 my $class_name = ref($self);
276 $method =~ s/$class_name\:://;
277 $timeout = 'undef' if (! defined($timeout));
278 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
288 Thread::Queue - Thread-safe queues
292 This document describes Thread::Queue version 3.05
302 my $q = Thread::Queue->new(); # A new empty queue
305 my $thr = threads->create(
307 # Thread will loop until no more work
308 while (defined(my $item = $q->dequeue())) {
315 # Send work to the thread
316 $q->enqueue($item1, ...);
317 # Signal that there is no more work to be sent
319 # Join up with the thread when it finishes
324 # Count of items in the queue
325 my $left = $q->pending();
327 # Non-blocking dequeue
328 if (defined(my $item = $q->dequeue_nb())) {
332 # Blocking dequeue with 5-second timeout
333 if (defined(my $item = $q->dequeue_timed(5))) {
337 # Get the second item in the queue without dequeuing anything
338 my $item = $q->peek(1);
340 # Insert two items into the queue just behind the head
341 $q->insert(1, $item1, $item2);
343 # Extract the last two items on the queue
344 my ($item1, $item2) = $q->extract(-2, 2);
348 This module provides thread-safe FIFO queues that can be accessed safely by
349 any number of threads.
351 Any data types supported by L<threads::shared> can be passed via queues:
355 =item Ordinary scalars
363 =item Objects based on the above
367 Ordinary scalars are added to queues as they are.
369 If not already thread-shared, the other complex data types will be cloned
370 (recursively, if needed, and including any C<bless>ings and read-only
371 settings) into thread-shared structures before being placed onto a queue.
373 For example, the following would cause L<Thread::Queue> to create a empty,
374 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
375 and 'baz' from C<@ary> into it, and then place that shared reference onto
378 my @ary = qw/foo bar baz/;
381 However, for the following, the items are already shared, so their references
382 are added directly to the queue, and no cloning takes place:
384 my @ary :shared = qw/foo bar baz/;
387 my $obj = &shared({});
388 $$obj{'foo'} = 'bar';
390 bless($obj, 'My::Class');
393 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
395 =head1 QUEUE CREATION
401 Creates a new empty queue.
405 Creates a new queue pre-populated with the provided list of items.
411 The following methods deal with queues on a FIFO basis.
415 =item ->enqueue(LIST)
417 Adds a list of items onto the end of the queue.
421 =item ->dequeue(COUNT)
423 Removes the requested number of items (default is 1) from the head of the
424 queue, and returns them. If the queue contains fewer than the requested
425 number of items, then the thread will be blocked until the requisite number
426 of items are available (i.e., until other threads <enqueue> more items).
430 =item ->dequeue_nb(COUNT)
432 Removes the requested number of items (default is 1) from the head of the
433 queue, and returns them. If the queue contains fewer than the requested
434 number of items, then it immediately (i.e., non-blocking) returns whatever
435 items there are on the queue. If the queue is empty, then C<undef> is
438 =item ->dequeue_timed(TIMEOUT)
440 =item ->dequeue_timed(TIMEOUT, COUNT)
442 Removes the requested number of items (default is 1) from the head of the
443 queue, and returns them. If the queue contains fewer than the requested
444 number of items, then the thread will be blocked until the requisite number of
445 items are available, or until the timeout is reached. If the timeout is
446 reached, it returns whatever items there are on the queue, or C<undef> if the
449 The timeout may be a number of seconds relative to the current time (e.g., 5
450 seconds from when the call is made), or may be an absolute timeout in I<epoch>
451 seconds the same as would be used with
452 L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
453 Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
454 the underlying implementation).
456 If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
457 behaves the same as C<dequeue_nb>.
461 Returns the number of items still in the queue. Returns C<undef> if the queue
462 has been ended (see below), and there are no more items in the queue.
466 Declares that no more items will be added to the queue.
468 All threads blocking on C<dequeue()> calls will be unblocked with any
469 remaining items in the queue and/or C<undef> being returned. Any subsequent
470 calls to C<dequeue()> will behave like C<dequeue_nb()>.
472 Once ended, no more items may be placed in the queue.
476 =head1 ADVANCED METHODS
478 The following methods can be used to manipulate items anywhere in a queue.
480 To prevent the contents of a queue from being modified by another thread
481 while it is being examined and/or changed, L<lock|threads::shared/"lock
482 VARIABLE"> the queue inside a local block:
485 lock($q); # Keep other threads from changing the queue's contents
486 my $item = $q->peek();
491 # Queue is now unlocked
499 Returns an item from the queue without dequeuing anything. Defaults to the
500 the head of queue (at index position 0) if no index is specified. Negative
501 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
502 is the end of the queue, -2 is next to last, and so on).
504 If no items exists at the specified index (i.e., the queue is empty, or the
505 index is beyond the number of items on the queue), then C<undef> is returned.
507 Remember, the returned item is not removed from the queue, so manipulating a
508 C<peek>ed at reference affects the item on the queue.
510 =item ->insert(INDEX, LIST)
512 Adds the list of items to the queue at the specified index position (0
513 is the head of the list). Any existing items at and beyond that position are
514 pushed back past the newly added items:
516 $q->enqueue(1, 2, 3, 4);
517 $q->insert(1, qw/foo bar/);
518 # Queue now contains: 1, foo, bar, 2, 3, 4
520 Specifying an index position greater than the number of items in the queue
521 just adds the list to the end.
523 Negative index positions are supported:
525 $q->enqueue(1, 2, 3, 4);
526 $q->insert(-2, qw/foo bar/);
527 # Queue now contains: 1, 2, foo, bar, 3, 4
529 Specifying a negative index position greater than the number of items in the
530 queue adds the list to the head of the queue.
534 =item ->extract(INDEX)
536 =item ->extract(INDEX, COUNT)
538 Removes and returns the specified number of items (defaults to 1) from the
539 specified index position in the queue (0 is the head of the queue). When
540 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
542 This method is non-blocking, and will return only as many items as are
543 available to fulfill the request:
545 $q->enqueue(1, 2, 3, 4);
546 my $item = $q->extract(2) # Returns 3
547 # Queue now contains: 1, 2, 4
548 my @items = $q->extract(1, 3) # Returns (2, 4)
549 # Queue now contains: 1
551 Specifying an index position greater than the number of items in the
552 queue results in C<undef> or an empty list being returned.
555 my $nada = $q->extract(3) # Returns undef
556 my @nada = $q->extract(1, 3) # Returns ()
558 Negative index positions are supported. Specifying a negative index position
559 greater than the number of items in the queue may return items from the head
560 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
561 queue from the specified position (i.e. if queue size + index + count is
564 $q->enqueue(qw/foo bar baz/);
565 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
566 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
567 # Queue now contains: bar, baz
568 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
574 Queues created by L<Thread::Queue> can be used in both threaded and
575 non-threaded applications.
579 Passing objects on queues may not work if the objects' classes do not support
580 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
582 Passing array/hash refs that contain objects may not work for Perl prior to
587 Thread::Queue Discussion Forum on CPAN:
588 L<http://www.cpanforum.com/dist/Thread-Queue>
590 L<threads>, L<threads::shared>
592 Sample code in the I<examples> directory of this distribution on CPAN.
596 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
600 This program is free software; you can redistribute it and/or modify it under
601 the same terms as Perl itself.