This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Thread::Queue 2.03
[perl5.git] / lib / Thread / Queue.pm
index 3b5c7c9..f436f04 100644 (file)
 package Thread::Queue;
 
-use threads::shared;
 use strict;
+use warnings;
 
-our $VERSION = '2.00';
+our $VERSION = '2.03';
+
+use threads::shared 0.96;
+use Scalar::Util 1.10 qw(looks_like_number);
+
+# Predeclarations for internal functions
+my ($make_shared, $validate_count, $validate_index);
+
+# Create a new queue possibly pre-populated with items
+sub new
+{
+    my $class = shift;
+    my @queue :shared = map { $make_shared->($_) } @_;
+    return bless(\@queue, $class);
+}
+
+# Add items to the tail of a queue
+sub enqueue
+{
+    my $queue = shift;
+    lock(@$queue);
+    push(@$queue, map { $make_shared->($_) } @_)
+        and cond_signal(@$queue);
+}
+
+# Return a count of the number of items on a queue
+sub pending
+{
+    my $queue = shift;
+    lock(@$queue);
+    return scalar(@$queue);
+}
+
+# Return 1 or more items from the head of a queue, blocking if needed
+sub dequeue
+{
+    my $queue = shift;
+    lock(@$queue);
+
+    my $count = @_ ? $validate_count->(shift) : 1;
+
+    # Wait for requisite number of items
+    cond_wait(@$queue) until (@$queue >= $count);
+    cond_signal(@$queue) if (@$queue > $count);
+
+    # Return single item
+    return shift(@$queue) if ($count == 1);
+
+    # Return multiple items
+    my @items;
+    push(@items, shift(@$queue)) for (1..$count);
+    return @items;
+}
+
+# Return items from the head of a queue with no blocking
+sub dequeue_nb
+{
+    my $queue = shift;
+    lock(@$queue);
+
+    my $count = @_ ? $validate_count->(shift) : 1;
+
+    # Return single item
+    return shift(@$queue) if ($count == 1);
+
+    # Return multiple items
+    my @items;
+    for (1..$count) {
+        last if (! @$queue);
+        push(@items, shift(@$queue));
+    }
+    return @items;
+}
+
+# Return an item without removing it from a queue
+sub peek
+{
+    my $queue = shift;
+    lock(@$queue);
+    my $index = @_ ? $validate_index->(shift) : 0;
+    return $$queue[$index];
+}
+
+# Insert items anywhere into a queue
+sub insert
+{
+    my $queue = shift;
+    lock(@$queue);
+
+    my $index = $validate_index->(shift);
+
+    return if (! @_);   # Nothing to insert
+
+    # Support negative indices
+    if ($index < 0) {
+        $index += @$queue;
+        if ($index < 0) {
+            $index = 0;
+        }
+    }
+
+    # Dequeue items from $index onward
+    my @tmp;
+    while (@$queue > $index) {
+        unshift(@tmp, pop(@$queue))
+    }
+
+    # Add new items to the queue
+    push(@$queue, map { $make_shared->($_) } @_);
+
+    # Add previous items back onto the queue
+    push(@$queue, @tmp);
+
+    # Soup's up
+    cond_signal(@$queue);
+}
+
+# Remove items from anywhere in a queue
+sub extract
+{
+    my $queue = shift;
+    lock(@$queue);
+
+    my $index = @_ ? $validate_index->(shift) : 0;
+    my $count = @_ ? $validate_count->(shift) : 1;
+
+    # Support negative indices
+    if ($index < 0) {
+        $index += @$queue;
+        if ($index < 0) {
+            $count += $index;
+            return if ($count <= 0);            # Beyond the head of the queue
+            return $queue->dequeue_nb($count);  # Extract from the head
+        }
+    }
+
+    # Dequeue items from $index+$count onward
+    my @tmp;
+    while (@$queue > ($index+$count)) {
+        unshift(@tmp, pop(@$queue))
+    }
+
+    # Extract desired items
+    my @items;
+    unshift(@items, pop(@$queue)) while (@$queue > $index);
+
+    # Add back any removed items
+    push(@$queue, @tmp);
+
+    # Return single item
+    return $items[0] if ($count == 1);
+
+    # Return multiple items
+    return @items;
+}
+
+### Internal Functions ###
+
+# Create a thread-shared version of a complex data structure or object
+$make_shared = sub {
+    my $item = shift;
+
+    # If already thread-shared, then just return the input item
+    return $item if (threads::shared::is_shared($item));
+
+    # Make copies of array, hash and scalar refs
+    my $copy;
+    if (my $ref_type = Scalar::Util::reftype($item)) {
+        # Copy an array ref
+        if ($ref_type eq 'ARRAY') {
+            # Make empty shared array ref
+            $copy = &share([]);
+            # Recursively copy and add contents
+            push(@$copy, map { $make_shared->($_) } @$item);
+        }
+
+        # Copy a hash ref
+        elsif ($ref_type eq 'HASH') {
+            # Make empty shared hash ref
+            $copy = &share({});
+            # Recursively copy and add contents
+            foreach my $key (keys(%{$item})) {
+                $copy->{$key} = $make_shared->($item->{$key});
+            }
+        }
+
+        # Copy a scalar ref
+        elsif ($ref_type eq 'SCALAR') {
+            $copy = \do{ my $scalar = $$item; };
+            share($copy);
+            # Clone READONLY flag
+            if (Internals::SvREADONLY($$item)) {
+                Internals::SvREADONLY($$copy, 1);
+            }
+        }
+
+        # Copy of a ref of a ref
+        elsif ($ref_type eq 'REF') {
+            my $tmp = $make_shared->($$item);
+            $copy = \$tmp;
+            share($copy);
+        }
+    }
+
+    # If no copy is created above, then just return the input item
+    # NOTE:  This will end up generating an error for anything
+    #        other than an ordinary scalar
+    return $item if (! defined($copy));
+
+    # Clone READONLY flag
+    if (Internals::SvREADONLY($item)) {
+        Internals::SvREADONLY($copy, 1);
+    }
+
+    # If input item is an object, then bless the copy into the same class
+    if (my $class = Scalar::Util::blessed($item)) {
+        bless($copy, $class);
+    }
+
+    return $copy;
+};
+
+# Check value of the requested index
+$validate_index = sub {
+    my $index = shift;
+
+    if (! looks_like_number($index) || (int($index) != $index)) {
+        require Carp;
+        my ($method) = (caller(1))[3];
+        $method =~ s/Thread::Queue:://;
+        $index = 'undef' if (! defined($index));
+        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
+    }
+
+    return $index;
+};
+
+# Check value of the requested count
+$validate_count = sub {
+    my $count = shift;
+
+    if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
+        require Carp;
+        my ($method) = (caller(1))[3];
+        $method =~ s/Thread::Queue:://;
+        $count = 'undef' if (! defined($count));
+        Carp::croak("Invalid 'count' argument ($count) to '$method' method");
+    }
+
+    return $count;
+};
+
+1;
 
 =head1 NAME
 
-Thread::Queue - thread-safe queues
+Thread::Queue - Thread-safe queues
+
+=head1 VERSION
+
+This document describes Thread::Queue version 2.03
 
 =head1 SYNOPSIS
 
+    use strict;
+    use warnings;
+
+    use threads;
     use Thread::Queue;
-    my $q = new Thread::Queue;
-    $q->enqueue("foo", "bar");
-    my $foo = $q->dequeue;    # The "bar" is still in the queue.
-    my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was empty
-    my $left = $q->pending;   # returns the number of items still in the queue
+
+    my $q = Thread::Queue->new();    # A new empty queue
+
+    # Worker thread
+    my $thr = threads->create(sub {
+                                while (my $item = $q->dequeue()) {
+                                    # Do work on $item
+                                }
+                             })->detach();
+
+    # Send work to the thread
+    $q->enqueue($item1, ...);
+
+
+    # Count of items in the queue
+    my $left = $q->pending();
+
+    # Non-blocking dequeue
+    if (defined(my $item = $q->dequeue_nb())) {
+        # Work on $item
+    }
+
+    # Get the second item in the queue without dequeuing anything
+    my $item = $q->peek(1);
+
+    # Insert two items into the queue just behind the head
+    $q->insert(1, $item1, $item2);
+
+    # Extract the last two items on the queue
+    my ($item1, $item2) = $q->extract(-2, 2);
 
 =head1 DESCRIPTION
 
-A queue, as implemented by C<Thread::Queue> is a thread-safe 
-data structure much like a list.  Any number of threads can safely 
-add elements to the end of the list, or remove elements from the head 
-of the list. (Queues don't permit adding or removing elements from 
-the middle of the list).
+This module provides thread-safe FIFO queues that can be accessed safely by
+any number of threads.
+
+Any data types supported by L<threads::shared> can be passed via queues:
+
+=over
+
+=item Ordinary scalars
+
+=item Array refs
+
+=item Hash refs
+
+=item Scalar refs
+
+=item Objects based on the above
+
+=back
+
+Ordinary scalars are added to queues as they are.
+
+If not already thread-shared, the other complex data types will be cloned
+(recursively, if needed, and including any C<bless>ings and read-only
+settings) into thread-shared structures before being placed onto a queue.
 
-=head1 FUNCTIONS AND METHODS
+For example, the following would cause L<Thread::Queue> to create a empty,
+shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
+and 'baz' from C<@ary> into it, and then place that shared reference onto
+the queue:
 
-=over 8
+    my @ary = qw/foo bar baz/;
+    $q->enqueue(\@ary);
 
-=item new
+However, for the following, the items are already shared, so their references
+are added directly to the queue, and no cloning takes place:
 
-The C<new> function creates a new empty queue.
+    my @ary :shared = qw/foo bar baz/;
+    $q->enqueue(\@ary);
 
-=item enqueue LIST
+    my $obj = &shared({});
+    $$obj{'foo'} = 'bar';
+    $$obj{'qux'} = 99;
+    bless($obj, 'My::Class');
+    $q->enqueue($obj);
 
-The C<enqueue> method adds a list of scalars on to the end of the queue.
-The queue will grow as needed to accommodate the list.
+See L</"LIMITATIONS"> for caveats related to passing objects via queues.
 
-=item dequeue
+=head1 QUEUE CREATION
 
-The C<dequeue> method removes a scalar from the head of the queue and
-returns it. If the queue is currently empty, C<dequeue> will block the
-thread until another thread C<enqueue>s a scalar.
+=over
 
-=item dequeue_nb
+=item ->new()
 
-The C<dequeue_nb> method, like the C<dequeue> method, removes a scalar from
-the head of the queue and returns it. Unlike C<dequeue>, though,
-C<dequeue_nb> won't block if the queue is empty, instead returning
-C<undef>.
+Creates a new empty queue.
 
-=item pending
+=item ->new(LIST)
 
-The C<pending> method returns the number of items still in the queue.
+Creates a new queue pre-populated with the provided list of items.
 
 =back
 
-=head1 SEE ALSO
+=head1 BASIC METHODS
 
-L<threads>, L<threads::shared>
+The following methods deal with queues on a FIFO basis.
 
-=cut
+=over
 
-sub new {
-    my $class = shift;
-    my @q : shared = @_;
-    return bless \@q, $class;
-}
+=item ->enqueue(LIST)
 
-sub dequeue  {
-    my $q = shift;
-    lock(@$q);
-    cond_wait @$q until @$q;
-    cond_signal @$q if @$q > 1;
-    return shift @$q;
-}
+Adds a list of items onto the end of the queue.
 
-sub dequeue_nb {
-    my $q = shift;
-    lock(@$q);
-    return shift @$q;
-}
+=item ->dequeue()
 
-sub enqueue {
-    my $q = shift;
-    lock(@$q);
-    push @$q, @_  and cond_signal @$q;
-}
+=item ->dequeue(COUNT)
 
-sub pending  {
-    my $q = shift;
-    lock(@$q);
-    return scalar(@$q);
-}
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them.  If the queue contains fewer than the requested
+number of items, then the thread will be blocked until the requisite number
+of items are available (i.e., until other threads <enqueue> more items).
 
-1;
+=item ->dequeue_nb()
+
+=item ->dequeue_nb(COUNT)
+
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them.  If the queue contains fewer than the requested
+number of items, then it immediately (i.e., non-blocking) returns whatever
+items there are on the queue.  If the queue is empty, then C<undef> is
+returned.
+
+=item ->pending()
+
+Returns the number of items still in the queue.
+
+=back
+
+=head1 ADVANCED METHODS
+
+The following methods can be used to manipulate items anywhere in a queue.
+
+To prevent the contents of a queue from being modified by another thread
+while it is being examined and/or changed, L<lock|threads::shared/"lock
+VARIABLE"> the queue inside a local block:
+
+    {
+        lock($q);   # Keep other threads from changing the queue's contents
+        my $item = $q->peek();
+        if ($item ...) {
+            ...
+        }
+    }
+    # Queue is now unlocked
+
+=over
+
+=item ->peek()
+
+=item ->peek(INDEX)
+
+Returns an item from the queue without dequeuing anything.  Defaults to the
+the head of queue (at index position 0) if no index is specified.  Negative
+index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
+is the end of the queue, -2 is next to last, and so on).
+
+If no items exists at the specified index (i.e., the queue is empty, or the
+index is beyond the number of items on the queue), then C<undef> is returned.
+
+Remember, the returned item is not removed from the queue, so manipulating a
+C<peek>ed at reference affects the item on the queue.
+
+=item ->insert(INDEX, LIST)
+
+Adds the list of items to the queue at the specified index position (0
+is the head of the list).  Any existing items at and beyond that position are
+pushed back past the newly added items:
+
+    $q->enqueue(1, 2, 3, 4);
+    $q->insert(1, qw/foo bar/);
+    # Queue now contains:  1, foo, bar, 2, 3, 4
 
+Specifying an index position greater than the number of items in the queue
+just adds the list to the end.
 
+Negative index positions are supported:
+
+    $q->enqueue(1, 2, 3, 4);
+    $q->insert(-2, qw/foo bar/);
+    # Queue now contains:  1, 2, foo, bar, 3, 4
+
+Specifying a negative index position greater than the number of items in the
+queue adds the list to the head of the queue.
+
+=item ->extract()
+
+=item ->extract(INDEX)
+
+=item ->extract(INDEX, COUNT)
+
+Removes and returns the specified number of items (defaults to 1) from the
+specified index position in the queue (0 is the head of the queue).  When
+called with no arguments, C<extract> operates the same as C<dequeue_nb>.
+
+This method is non-blocking, and will return only as many items as are
+available to fulfill the request:
+
+    $q->enqueue(1, 2, 3, 4);
+    my $item  = $q->extract(2)     # Returns 3
+                                   # Queue now contains:  1, 2, 4
+    my @items = $q->extract(1, 3)  # Returns (2, 4)
+                                   # Queue now contains:  1
+
+Specifying an index position greater than the number of items in the
+queue results in C<undef> or an empty list being returned.
+
+    $q->enqueue('foo');
+    my $nada = $q->extract(3)      # Returns undef
+    my @nada = $q->extract(1, 3)   # Returns ()
+
+Negative index positions are supported.  Specifying a negative index position
+greater than the number of items in the queue may return items from the head
+of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
+queue from the specified position (i.e. if queue size + index + count is
+greater than zero):
+
+    $q->enqueue(qw/foo bar baz/);
+    my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
+    my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
+                                     # Queue now contains:  bar, baz
+    my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
+
+=back
+
+=head1 LIMITATIONS
+
+Passing objects on queues may not work if the objects' classes do not support
+sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
+
+Passing array/hash refs that contain objects may not work for Perl prior to
+5.10.0.
+
+=head1 SEE ALSO
+
+Thread::Queue Discussion Forum on CPAN:
+L<http://www.cpanforum.com/dist/Thread-Queue>
+
+Annotated POD for Thread::Queue:
+L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.03/lib/Thread/Queue.pm>
+
+L<threads>, L<threads::shared>
+
+=head1 MAINTAINER
+
+Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
+
+=head1 LICENSE
+
+This program is free software; you can redistribute it and/or modify it under
+the same terms as Perl itself.
+
+=cut