This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.01
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
index 8588ed5..0bf1624 100644 (file)
@@ -3,7 +3,7 @@ package Thread::Queue;
 use strict;
 use warnings;
 
-our $VERSION = '2.12';
+our $VERSION = '3.01';
 $VERSION = eval $VERSION;
 
 use threads::shared 1.21;
@@ -20,37 +20,58 @@ sub new
 {
     my $class = shift;
     my @queue :shared = map { shared_clone($_) } @_;
-    return bless(\@queue, $class);
+    my %self :shared = ( 'queue' => \@queue );
+    return bless(\%self, $class);
 }
 
 # Add items to the tail of a queue
 sub enqueue
 {
-    my $queue = shift;
-    lock(@$queue);
-    push(@$queue, map { shared_clone($_) } @_)
-        and cond_signal(@$queue);
+    my $self = shift;
+    lock(%$self);
+    if ($$self{'ENDED'}) {
+        require Carp;
+        Carp::croak("'enqueue' method called on queue that has been 'end'ed");
+    }
+    push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+        and cond_signal(%$self);
 }
 
 # Return a count of the number of items on a queue
 sub pending
 {
-    my $queue = shift;
-    lock(@$queue);
-    return scalar(@$queue);
+    my $self = shift;
+    lock(%$self);
+    return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
+    return scalar(@{$$self{'queue'}});
+}
+
+# Indicate that no more data will enter the queue
+sub end
+{
+    my $self = shift;
+    lock $self;
+    # No more data is coming
+    $$self{'ENDED'} = 1;
+    # Try to release at least one blocked thread
+    cond_signal(%$self);
 }
 
 # Return 1 or more items from the head of a queue, blocking if needed
 sub dequeue
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
 
     my $count = @_ ? $validate_count->(shift) : 1;
 
     # Wait for requisite number of items
-    cond_wait(@$queue) until (@$queue >= $count);
-    cond_signal(@$queue) if (@$queue > $count);
+    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
+    cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+    # If no longer blocking, try getting whatever is left on the queue
+    return $self->dequeue_nb($count) if ($$self{'ENDED'});
 
     # Return single item
     return shift(@$queue) if ($count == 1);
@@ -64,8 +85,9 @@ sub dequeue
 # Return items from the head of a queue with no blocking
 sub dequeue_nb
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
 
     my $count = @_ ? $validate_count->(shift) : 1;
 
@@ -84,17 +106,24 @@ sub dequeue_nb
 # Return an item without removing it from a queue
 sub peek
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
     my $index = @_ ? $validate_index->(shift) : 0;
-    return $$queue[$index];
+    return $$self{'queue'}[$index];
 }
 
 # Insert items anywhere into a queue
 sub insert
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+
+    if ($$self{'ENDED'}) {
+        require Carp;
+        Carp::croak("'insert' method called on queue that has been 'end'ed");
+    }
+
+    my $queue = $$self{'queue'};
 
     my $index = $validate_index->(shift);
 
@@ -121,14 +150,15 @@ sub insert
     push(@$queue, @tmp);
 
     # Soup's up
-    cond_signal(@$queue);
+    cond_signal(%$self);
 }
 
 # Remove items from anywhere in a queue
 sub extract
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
 
     my $index = @_ ? $validate_index->(shift) : 0;
     my $count = @_ ? $validate_count->(shift) : 1;
@@ -139,7 +169,7 @@ sub extract
         if ($index < 0) {
             $count += $index;
             return if ($count <= 0);            # Beyond the head of the queue
-            return $queue->dequeue_nb($count);  # Extract from the head
+            return $self->dequeue_nb($count);  # Extract from the head
         }
     }
 
@@ -210,7 +240,7 @@ Thread::Queue - Thread-safe queues
 
 =head1 VERSION
 
-This document describes Thread::Queue version 2.12
+This document describes Thread::Queue version 3.01
 
 =head1 SYNOPSIS
 
@@ -223,15 +253,24 @@ This document describes Thread::Queue version 2.12
     my $q = Thread::Queue->new();    # A new empty queue
 
     # Worker thread
-    my $thr = threads->create(sub {
-                                while (my $item = $q->dequeue()) {
-                                    # Do work on $item
-                                }
-                             })->detach();
+    my $thr = threads->create(
+        sub {
+            # Thread will loop until no more work
+            while (defined(my $item = $q->dequeue())) {
+                # Do work on $item
+                ...
+            }
+        }
+    );
 
     # Send work to the thread
     $q->enqueue($item1, ...);
+    # Signal that there is no more work to be sent
+    $q->end();
+    # Join up with the thread when it finishes
+    $thr->join();
 
+    ...
 
     # Count of items in the queue
     my $left = $q->pending();
@@ -344,7 +383,18 @@ returned.
 
 =item ->pending()
 
-Returns the number of items still in the queue.
+Returns the number of items still in the queue.  Returns C<undef> if the queue
+has been ended (see below), and there are no more items in the queue.
+
+=item ->end()
+
+Declares that no more items will be added to the queue.
+
+All threads blocking on C<dequeue()> calls will be unblocked with any
+remaining items in the queue and/or C<undef> being returned.  Any subsequent
+calls to C<dequeue()> will behave like C<dequeue_nb()>.
+
+Once ended, no more items may be placed in the queue.
 
 =back
 
@@ -464,6 +514,8 @@ L<http://www.cpanforum.com/dist/Thread-Queue>
 
 L<threads>, L<threads::shared>
 
+Sample code in the I<examples> directory of this distribution on CPAN.
+
 =head1 MAINTAINER
 
 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>