Upgrade to Thread::Queue 3.01
authorJerry D. Hedden <jdhedden@cpan.org>
Wed, 24 Oct 2012 02:48:50 +0000 (22:48 -0400)
committerFather Chrysostomos <sprout@cpan.org>
Tue, 6 Nov 2012 20:33:38 +0000 (12:33 -0800)
MANIFEST [changed mode: 0644->0755]
Porting/Maintainers.pl
dist/Thread-Queue/lib/Thread/Queue.pm
dist/Thread-Queue/t/09_ended.t [new file with mode: 0644]

old mode 100644 (file)
new mode 100755 (executable)
index 69cb28e..e276e34
--- a/MANIFEST
+++ b/MANIFEST
@@ -3522,6 +3522,7 @@ dist/Thread-Queue/t/05_extract.t  Thread::Queue tests
 dist/Thread-Queue/t/06_insert.t                Thread::Queue tests
 dist/Thread-Queue/t/07_lock.t          Thread::Queue tests
 dist/Thread-Queue/t/08_nothreads.t     Thread::Queue tests
+dist/Thread-Queue/t/09_ended.t         Thread::Queue tests
 dist/Thread-Semaphore/lib/Thread/Semaphore.pm  Thread-safe semaphores
 dist/Thread-Semaphore/t/01_basic.t             Thread::Semaphore tests
 dist/Thread-Semaphore/t/02_errs.t              Thread::Semaphore tests
index 282eacc..e86c59c 100755 (executable)
@@ -1896,11 +1896,11 @@ use File::Glob qw(:case);
 
     'Thread::Queue' => {
         'MAINTAINER'   => 'jdhedden',
-        'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-2.12.tar.gz',
+        'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.01.tar.gz',
         'FILES'        => q[dist/Thread-Queue],
         'EXCLUDED'     => [
-            qw( examples/queue.pl
-                t/00_load.t
+            qr{^examples/},
+            qw( t/00_load.t
                 t/99_pod.t
                 t/test.pl
                 ),
index 8588ed5..0bf1624 100644 (file)
@@ -3,7 +3,7 @@ package Thread::Queue;
 use strict;
 use warnings;
 
-our $VERSION = '2.12';
+our $VERSION = '3.01';
 $VERSION = eval $VERSION;
 
 use threads::shared 1.21;
@@ -20,37 +20,58 @@ sub new
 {
     my $class = shift;
     my @queue :shared = map { shared_clone($_) } @_;
-    return bless(\@queue, $class);
+    my %self :shared = ( 'queue' => \@queue );
+    return bless(\%self, $class);
 }
 
 # Add items to the tail of a queue
 sub enqueue
 {
-    my $queue = shift;
-    lock(@$queue);
-    push(@$queue, map { shared_clone($_) } @_)
-        and cond_signal(@$queue);
+    my $self = shift;
+    lock(%$self);
+    if ($$self{'ENDED'}) {
+        require Carp;
+        Carp::croak("'enqueue' method called on queue that has been 'end'ed");
+    }
+    push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+        and cond_signal(%$self);
 }
 
 # Return a count of the number of items on a queue
 sub pending
 {
-    my $queue = shift;
-    lock(@$queue);
-    return scalar(@$queue);
+    my $self = shift;
+    lock(%$self);
+    return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
+    return scalar(@{$$self{'queue'}});
+}
+
+# Indicate that no more data will enter the queue
+sub end
+{
+    my $self = shift;
+    lock $self;
+    # No more data is coming
+    $$self{'ENDED'} = 1;
+    # Try to release at least one blocked thread
+    cond_signal(%$self);
 }
 
 # Return 1 or more items from the head of a queue, blocking if needed
 sub dequeue
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
 
     my $count = @_ ? $validate_count->(shift) : 1;
 
     # Wait for requisite number of items
-    cond_wait(@$queue) until (@$queue >= $count);
-    cond_signal(@$queue) if (@$queue > $count);
+    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
+    cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+    # If no longer blocking, try getting whatever is left on the queue
+    return $self->dequeue_nb($count) if ($$self{'ENDED'});
 
     # Return single item
     return shift(@$queue) if ($count == 1);
@@ -64,8 +85,9 @@ sub dequeue
 # Return items from the head of a queue with no blocking
 sub dequeue_nb
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
 
     my $count = @_ ? $validate_count->(shift) : 1;
 
@@ -84,17 +106,24 @@ sub dequeue_nb
 # Return an item without removing it from a queue
 sub peek
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
     my $index = @_ ? $validate_index->(shift) : 0;
-    return $$queue[$index];
+    return $$self{'queue'}[$index];
 }
 
 # Insert items anywhere into a queue
 sub insert
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+
+    if ($$self{'ENDED'}) {
+        require Carp;
+        Carp::croak("'insert' method called on queue that has been 'end'ed");
+    }
+
+    my $queue = $$self{'queue'};
 
     my $index = $validate_index->(shift);
 
@@ -121,14 +150,15 @@ sub insert
     push(@$queue, @tmp);
 
     # Soup's up
-    cond_signal(@$queue);
+    cond_signal(%$self);
 }
 
 # Remove items from anywhere in a queue
 sub extract
 {
-    my $queue = shift;
-    lock(@$queue);
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
 
     my $index = @_ ? $validate_index->(shift) : 0;
     my $count = @_ ? $validate_count->(shift) : 1;
@@ -139,7 +169,7 @@ sub extract
         if ($index < 0) {
             $count += $index;
             return if ($count <= 0);            # Beyond the head of the queue
-            return $queue->dequeue_nb($count);  # Extract from the head
+            return $self->dequeue_nb($count);  # Extract from the head
         }
     }
 
@@ -210,7 +240,7 @@ Thread::Queue - Thread-safe queues
 
 =head1 VERSION
 
-This document describes Thread::Queue version 2.12
+This document describes Thread::Queue version 3.01
 
 =head1 SYNOPSIS
 
@@ -223,15 +253,24 @@ This document describes Thread::Queue version 2.12
     my $q = Thread::Queue->new();    # A new empty queue
 
     # Worker thread
-    my $thr = threads->create(sub {
-                                while (my $item = $q->dequeue()) {
-                                    # Do work on $item
-                                }
-                             })->detach();
+    my $thr = threads->create(
+        sub {
+            # Thread will loop until no more work
+            while (defined(my $item = $q->dequeue())) {
+                # Do work on $item
+                ...
+            }
+        }
+    );
 
     # Send work to the thread
     $q->enqueue($item1, ...);
+    # Signal that there is no more work to be sent
+    $q->end();
+    # Join up with the thread when it finishes
+    $thr->join();
 
+    ...
 
     # Count of items in the queue
     my $left = $q->pending();
@@ -344,7 +383,18 @@ returned.
 
 =item ->pending()
 
-Returns the number of items still in the queue.
+Returns the number of items still in the queue.  Returns C<undef> if the queue
+has been ended (see below), and there are no more items in the queue.
+
+=item ->end()
+
+Declares that no more items will be added to the queue.
+
+All threads blocking on C<dequeue()> calls will be unblocked with any
+remaining items in the queue and/or C<undef> being returned.  Any subsequent
+calls to C<dequeue()> will behave like C<dequeue_nb()>.
+
+Once ended, no more items may be placed in the queue.
 
 =back
 
@@ -464,6 +514,8 @@ L<http://www.cpanforum.com/dist/Thread-Queue>
 
 L<threads>, L<threads::shared>
 
+Sample code in the I<examples> directory of this distribution on CPAN.
+
 =head1 MAINTAINER
 
 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
diff --git a/dist/Thread-Queue/t/09_ended.t b/dist/Thread-Queue/t/09_ended.t
new file mode 100644 (file)
index 0000000..a0a9292
--- /dev/null
@@ -0,0 +1,146 @@
+use strict;
+use warnings;
+
+use Config;
+
+BEGIN {
+    if (! $Config{'useithreads'}) {
+        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+        exit(0);
+    }
+    if (! $Config{'d_select'}) {
+        print("1..0 # SKIP 'select()' not available for testing\n");
+        exit(0);
+    }
+}
+
+use threads;
+use Thread::Queue;
+
+use Test::More;
+
+my $num_threads = 3;
+my $cycles = 2;
+my $count = 2;
+plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;
+
+# Test for end() while threads are blocked and no more items in queue
+{
+    my @items = 1..($num_threads*$cycles*$count);
+    my $q = Thread::Queue->new(@items);
+    my $r = Thread::Queue->new();
+
+    my @threads;
+    for my $ii (1..$num_threads) {
+        push @threads, threads->create( sub {
+            # Thread will loop until no more work is coming
+            LOOP:
+            while (my @set = $q->dequeue($count)) {
+                foreach my $item (@set) {
+                    last LOOP if (! defined($item));
+                    pass("'$item' read from queue in thread $ii");
+                }
+                select(undef, undef, undef, rand(1));
+                $r->enqueue($ii);
+            }
+            pass("Thread $ii exiting");
+        });
+    }
+
+    # Make sure there's nothing in the queue and threads are blocking
+    for my $ii (1..($num_threads*$cycles)) {
+        $r->dequeue();
+    }
+    sleep(1);
+    threads->yield();
+
+    is($q->pending(), 0, 'Queue is empty');
+
+    # Signal no more work is coming
+    $q->end();
+
+    is($q->pending(), undef, 'Queue is ended');
+
+    for my $thread (@threads) {
+        $thread->join;
+        pass($thread->tid." joined");
+    }
+}
+
+# Test for end() while threads are blocked and items still remain in queue
+{
+    my @items = 1..($num_threads*$cycles*$count + 1);
+    my $q = Thread::Queue->new(@items);
+    my $r = Thread::Queue->new();
+
+    my @threads;
+    for my $ii (1..$num_threads) {
+        push @threads, threads->create( sub {
+            # Thread will loop until no more work is coming
+            LOOP:
+            while (my @set = $q->dequeue($count)) {
+                foreach my $item (@set) {
+                    last LOOP if (! defined($item));
+                    pass("'$item' read from queue in thread $ii");
+                }
+                select(undef, undef, undef, rand(1));
+                $r->enqueue($ii);
+            }
+            pass("Thread $ii exiting");
+        });
+    }
+
+    # Make sure there's nothing in the queue and threads are blocking
+    for my $ii (1..($num_threads*$cycles)) {
+        $r->dequeue();
+    }
+    sleep(1);
+    threads->yield();
+
+    is($q->pending(), 1, 'Queue has one left');
+
+    # Signal no more work is coming
+    $q->end();
+
+    for my $thread (@threads) {
+        $thread->join;
+        pass($thread->tid." joined");
+    }
+
+    is($q->pending(), undef, 'Queue is ended');
+}
+
+# Test of end() send while items in queue
+{
+    my @items = 1..($num_threads*$cycles*$count + 1);
+    my $q = Thread::Queue->new(@items);
+
+    my @threads;
+    for my $ii (1..$num_threads) {
+        push @threads, threads->create( sub {
+            # Thread will loop until no more work is coming
+            LOOP:
+            while (my @set = $q->dequeue($count)) {
+                foreach my $item (@set) {
+                    last LOOP if (! defined($item));
+                    pass("'$item' read from queue in thread $ii");
+                }
+                select(undef, undef, undef, rand(1));
+            }
+            pass("Thread $ii exiting");
+        });
+    }
+
+    # Signal no more work is coming to the blocked threads, they
+    # should unblock.
+    $q->end();
+
+    for my $thread (@threads) {
+        $thread->join;
+        pass($thread->tid." joined");
+    }
+}
+
+exit(0);
+
+# EOF