7 $VERSION = eval $VERSION;
9 use threads::shared 1.21;
10 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
12 # Carp errors from threads::shared calls should complain about caller
13 our @CARP_NOT = ("threads::shared");
15 # Create a new queue possibly pre-populated with items
19 my @queue :shared = map { shared_clone($_) } @_;
20 my %self :shared = ( 'queue' => \@queue );
21 return bless(\%self, $class);
24 # Add items to the tail of a queue
30 if ($$self{'ENDED'}) {
32 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
35 # Block if queue size exceeds any specified limit
36 my $queue = $$self{'queue'};
37 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
39 # Add items to queue, and then signal other threads
40 push(@$queue, map { shared_clone($_) } @_)
41 and cond_signal(%$self);
44 # Set or return the max. size for a queue
52 # Return a count of the number of items on a queue
57 return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
58 return scalar(@{$$self{'queue'}});
61 # Indicate that no more data will enter the queue
66 # No more data is coming
68 # Try to release at least one blocked thread
72 # Return 1 or more items from the head of a queue, blocking if needed
77 my $queue = $$self{'queue'};
79 my $count = @_ ? $self->_validate_count(shift) : 1;
81 # Wait for requisite number of items
82 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
85 # If no longer blocking, try getting whatever is left on the queue
86 return $self->dequeue_nb($count) if ($$self{'ENDED'});
89 return shift(@$queue) if ($count == 1);
91 # Return multiple items
93 push(@items, shift(@$queue)) for (1..$count);
97 # Return items from the head of a queue with no blocking
102 my $queue = $$self{'queue'};
104 my $count = @_ ? $self->_validate_count(shift) : 1;
107 return shift(@$queue) if ($count == 1);
109 # Return multiple items
113 push(@items, shift(@$queue));
118 # Return items from the head of a queue, blocking if needed up to a timeout
123 my $queue = $$self{'queue'};
125 # Timeout may be relative or absolute
126 my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
127 # Convert to an absolute time for use with cond_timedwait()
128 if ($timeout < 32000000) { # More than one year
132 my $count = @_ ? $self->_validate_count(shift) : 1;
134 # Wait for requisite number of items, or until timeout
135 while ((@$queue < $count) && ! $$self{'ENDED'}) {
136 last if (! cond_timedwait(%$self, $timeout));
138 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
140 # Get whatever we need off the queue if available
141 return $self->dequeue_nb($count);
144 # Return an item without removing it from a queue
149 my $index = @_ ? $self->_validate_index(shift) : 0;
150 return $$self{'queue'}[$index];
153 # Insert items anywhere into a queue
159 if ($$self{'ENDED'}) {
161 Carp::croak("'insert' method called on queue that has been 'end'ed");
164 my $queue = $$self{'queue'};
166 my $index = $self->_validate_index(shift);
168 return if (! @_); # Nothing to insert
170 # Support negative indices
178 # Dequeue items from $index onward
180 while (@$queue > $index) {
181 unshift(@tmp, pop(@$queue))
184 # Add new items to the queue
185 push(@$queue, map { shared_clone($_) } @_);
187 # Add previous items back onto the queue
194 # Remove items from anywhere in a queue
199 my $queue = $$self{'queue'};
201 my $index = @_ ? $self->_validate_index(shift) : 0;
202 my $count = @_ ? $self->_validate_count(shift) : 1;
204 # Support negative indices
209 return if ($count <= 0); # Beyond the head of the queue
210 return $self->dequeue_nb($count); # Extract from the head
214 # Dequeue items from $index+$count onward
216 while (@$queue > ($index+$count)) {
217 unshift(@tmp, pop(@$queue))
220 # Extract desired items
222 unshift(@items, pop(@$queue)) while (@$queue > $index);
224 # Add back any removed items
228 return $items[0] if ($count == 1);
230 # Return multiple items
234 ### Internal Methods ###
236 # Check value of the requested index
242 if (! defined($index) ||
243 ! looks_like_number($index) ||
244 (int($index) != $index))
247 my ($method) = (caller(1))[3];
248 my $class_name = ref($self);
249 $method =~ s/$class_name\:://;
250 $index = 'undef' if (! defined($index));
251 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
257 # Check value of the requested count
263 if (! defined($count) ||
264 ! looks_like_number($count) ||
265 (int($count) != $count) ||
269 my ($method) = (caller(1))[3];
270 my $class_name = ref($self);
271 $method =~ s/$class_name\:://;
272 $count = 'undef' if (! defined($count));
273 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
279 # Check value of the requested timeout
280 sub _validate_timeout
285 if (! defined($timeout) ||
286 ! looks_like_number($timeout))
289 my ($method) = (caller(1))[3];
290 my $class_name = ref($self);
291 $method =~ s/$class_name\:://;
292 $timeout = 'undef' if (! defined($timeout));
293 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
303 Thread::Queue - Thread-safe queues
307 This document describes Thread::Queue version 3.06
317 my $q = Thread::Queue->new(); # A new empty queue
320 my $thr = threads->create(
322 # Thread will loop until no more work
323 while (defined(my $item = $q->dequeue())) {
330 # Send work to the thread
331 $q->enqueue($item1, ...);
332 # Signal that there is no more work to be sent
334 # Join up with the thread when it finishes
339 # Count of items in the queue
340 my $left = $q->pending();
342 # Non-blocking dequeue
343 if (defined(my $item = $q->dequeue_nb())) {
347 # Blocking dequeue with 5-second timeout
348 if (defined(my $item = $q->dequeue_timed(5))) {
352 # Set a size for a queue
355 # Get the second item in the queue without dequeuing anything
356 my $item = $q->peek(1);
358 # Insert two items into the queue just behind the head
359 $q->insert(1, $item1, $item2);
361 # Extract the last two items on the queue
362 my ($item1, $item2) = $q->extract(-2, 2);
366 This module provides thread-safe FIFO queues that can be accessed safely by
367 any number of threads.
369 Any data types supported by L<threads::shared> can be passed via queues:
373 =item Ordinary scalars
381 =item Objects based on the above
385 Ordinary scalars are added to queues as they are.
387 If not already thread-shared, the other complex data types will be cloned
388 (recursively, if needed, and including any C<bless>ings and read-only
389 settings) into thread-shared structures before being placed onto a queue.
391 For example, the following would cause L<Thread::Queue> to create a empty,
392 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
393 and 'baz' from C<@ary> into it, and then place that shared reference onto
396 my @ary = qw/foo bar baz/;
399 However, for the following, the items are already shared, so their references
400 are added directly to the queue, and no cloning takes place:
402 my @ary :shared = qw/foo bar baz/;
405 my $obj = &shared({});
406 $$obj{'foo'} = 'bar';
408 bless($obj, 'My::Class');
411 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
413 =head1 QUEUE CREATION
419 Creates a new empty queue.
423 Creates a new queue pre-populated with the provided list of items.
429 The following methods deal with queues on a FIFO basis.
433 =item ->enqueue(LIST)
435 Adds a list of items onto the end of the queue.
439 =item ->dequeue(COUNT)
441 Removes the requested number of items (default is 1) from the head of the
442 queue, and returns them. If the queue contains fewer than the requested
443 number of items, then the thread will be blocked until the requisite number
444 of items are available (i.e., until other threads C<enqueue> more items).
448 =item ->dequeue_nb(COUNT)
450 Removes the requested number of items (default is 1) from the head of the
451 queue, and returns them. If the queue contains fewer than the requested
452 number of items, then it immediately (i.e., non-blocking) returns whatever
453 items there are on the queue. If the queue is empty, then C<undef> is
456 =item ->dequeue_timed(TIMEOUT)
458 =item ->dequeue_timed(TIMEOUT, COUNT)
460 Removes the requested number of items (default is 1) from the head of the
461 queue, and returns them. If the queue contains fewer than the requested
462 number of items, then the thread will be blocked until the requisite number of
463 items are available, or until the timeout is reached. If the timeout is
464 reached, it returns whatever items there are on the queue, or C<undef> if the
467 The timeout may be a number of seconds relative to the current time (e.g., 5
468 seconds from when the call is made), or may be an absolute timeout in I<epoch>
469 seconds the same as would be used with
470 L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
471 Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
472 the underlying implementation).
474 If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
475 behaves the same as C<dequeue_nb>.
479 Returns the number of items still in the queue. Returns C<undef> if the queue
480 has been ended (see below), and there are no more items in the queue.
484 Sets the size of the queue. If set, calls to C<enqueue()> will block until
485 the number of pending items in the queue drops below the C<limit>. The
486 C<limit> does not prevent enqueuing items beyond that count:
488 my $q = Thread::Queue->new(1, 2);
490 $q->enqueue(3, 4, 5); # Does not block
491 $q->enqueue(6); # Blocks until at least 2 items are dequeued
493 my $size = $q->limit; # Returns the current limit (may return 'undef')
494 $q->limit = 0; # Queue size is now unlimited
498 Declares that no more items will be added to the queue.
500 All threads blocking on C<dequeue()> calls will be unblocked with any
501 remaining items in the queue and/or C<undef> being returned. Any subsequent
502 calls to C<dequeue()> will behave like C<dequeue_nb()>.
504 Once ended, no more items may be placed in the queue.
508 =head1 ADVANCED METHODS
510 The following methods can be used to manipulate items anywhere in a queue.
512 To prevent the contents of a queue from being modified by another thread
513 while it is being examined and/or changed, L<lock|threads::shared/"lock
514 VARIABLE"> the queue inside a local block:
517 lock($q); # Keep other threads from changing the queue's contents
518 my $item = $q->peek();
523 # Queue is now unlocked
531 Returns an item from the queue without dequeuing anything. Defaults to the
532 the head of queue (at index position 0) if no index is specified. Negative
533 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
534 is the end of the queue, -2 is next to last, and so on).
536 If no items exists at the specified index (i.e., the queue is empty, or the
537 index is beyond the number of items on the queue), then C<undef> is returned.
539 Remember, the returned item is not removed from the queue, so manipulating a
540 C<peek>ed at reference affects the item on the queue.
542 =item ->insert(INDEX, LIST)
544 Adds the list of items to the queue at the specified index position (0
545 is the head of the list). Any existing items at and beyond that position are
546 pushed back past the newly added items:
548 $q->enqueue(1, 2, 3, 4);
549 $q->insert(1, qw/foo bar/);
550 # Queue now contains: 1, foo, bar, 2, 3, 4
552 Specifying an index position greater than the number of items in the queue
553 just adds the list to the end.
555 Negative index positions are supported:
557 $q->enqueue(1, 2, 3, 4);
558 $q->insert(-2, qw/foo bar/);
559 # Queue now contains: 1, 2, foo, bar, 3, 4
561 Specifying a negative index position greater than the number of items in the
562 queue adds the list to the head of the queue.
566 =item ->extract(INDEX)
568 =item ->extract(INDEX, COUNT)
570 Removes and returns the specified number of items (defaults to 1) from the
571 specified index position in the queue (0 is the head of the queue). When
572 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
574 This method is non-blocking, and will return only as many items as are
575 available to fulfill the request:
577 $q->enqueue(1, 2, 3, 4);
578 my $item = $q->extract(2) # Returns 3
579 # Queue now contains: 1, 2, 4
580 my @items = $q->extract(1, 3) # Returns (2, 4)
581 # Queue now contains: 1
583 Specifying an index position greater than the number of items in the
584 queue results in C<undef> or an empty list being returned.
587 my $nada = $q->extract(3) # Returns undef
588 my @nada = $q->extract(1, 3) # Returns ()
590 Negative index positions are supported. Specifying a negative index position
591 greater than the number of items in the queue may return items from the head
592 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
593 queue from the specified position (i.e. if queue size + index + count is
596 $q->enqueue(qw/foo bar baz/);
597 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
598 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
599 # Queue now contains: bar, baz
600 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
606 Queues created by L<Thread::Queue> can be used in both threaded and
607 non-threaded applications.
611 Passing objects on queues may not work if the objects' classes do not support
612 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
614 Passing array/hash refs that contain objects may not work for Perl prior to
619 Thread::Queue Discussion Forum on CPAN:
620 L<http://www.cpanforum.com/dist/Thread-Queue>
622 L<threads>, L<threads::shared>
624 Sample code in the I<examples> directory of this distribution on CPAN.
628 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
632 This program is free software; you can redistribute it and/or modify it under
633 the same terms as Perl itself.