This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.06
authorJerry D. Hedden <jdhedden@cpan.org>
Sat, 22 Aug 2015 20:40:50 +0000 (16:40 -0400)
committerJames E Keenan <jkeenan@cpan.org>
Sat, 22 Aug 2015 23:21:35 +0000 (19:21 -0400)
For: RT #125864

MANIFEST
Porting/Maintainers.pl
dist/Thread-Queue/lib/Thread/Queue.pm
dist/Thread-Queue/t/07_lock.t
dist/Thread-Queue/t/11_limit.t [new file with mode: 0644]

index 7210f5a..26faf67 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -3423,6 +3423,7 @@ dist/Thread-Queue/t/07_lock.t             Thread::Queue tests
 dist/Thread-Queue/t/08_nothreads.t     Thread::Queue tests
 dist/Thread-Queue/t/09_ended.t         Thread::Queue tests
 dist/Thread-Queue/t/10_timed.t Thread::Queue tests
+dist/Thread-Queue/t/11_limit.t Thread::Queue tests
 dist/Thread-Semaphore/lib/Thread/Semaphore.pm  Thread-safe semaphores
 dist/Thread-Semaphore/t/01_basic.t             Thread::Semaphore tests
 dist/Thread-Semaphore/t/02_errs.t              Thread::Semaphore tests
index 778caf4..fc734a4 100755 (executable)
@@ -1159,7 +1159,7 @@ use File::Glob qw(:case);
     # correct for this (and Thread::Semaphore, threads, and threads::shared)
     # to be under dist/ rather than cpan/
     'Thread::Queue' => {
-        'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.05.tar.gz',
+        'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.06.tar.gz',
         'FILES'        => q[dist/Thread-Queue],
         'EXCLUDED'     => [
             qr{^examples/},
index 316644a..ebc1c31 100644 (file)
@@ -3,7 +3,7 @@ package Thread::Queue;
 use strict;
 use warnings;
 
-our $VERSION = '3.05';
+our $VERSION = '3.06';
 $VERSION = eval $VERSION;
 
 use threads::shared 1.21;
@@ -26,14 +26,29 @@ sub enqueue
 {
     my $self = shift;
     lock(%$self);
+
     if ($$self{'ENDED'}) {
         require Carp;
         Carp::croak("'enqueue' method called on queue that has been 'end'ed");
     }
-    push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+
+    # Block if queue size exceeds any specified limit
+    my $queue = $$self{'queue'};
+    cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
+
+    # Add items to queue, and then signal other threads
+    push(@$queue, map { shared_clone($_) } @_)
         and cond_signal(%$self);
 }
 
+# Set or return the max. size for a queue
+sub limit : lvalue
+{
+    my $self = shift;
+    lock(%$self);
+    $$self{'LIMIT'};
+}
+
 # Return a count of the number of items on a queue
 sub pending
 {
@@ -47,7 +62,7 @@ sub pending
 sub end
 {
     my $self = shift;
-    lock $self;
+    lock(%$self);
     # No more data is coming
     $$self{'ENDED'} = 1;
     # Try to release at least one blocked thread
@@ -289,7 +304,7 @@ Thread::Queue - Thread-safe queues
 
 =head1 VERSION
 
-This document describes Thread::Queue version 3.05
+This document describes Thread::Queue version 3.06
 
 =head1 SYNOPSIS
 
@@ -334,6 +349,9 @@ This document describes Thread::Queue version 3.05
         # Work on $item
     }
 
+    # Set a size for a queue
+    $q->limit = 5;
+
     # Get the second item in the queue without dequeuing anything
     my $item = $q->peek(1);
 
@@ -423,7 +441,7 @@ Adds a list of items onto the end of the queue.
 Removes the requested number of items (default is 1) from the head of the
 queue, and returns them.  If the queue contains fewer than the requested
 number of items, then the thread will be blocked until the requisite number
-of items are available (i.e., until other threads <enqueue> more items).
+of items are available (i.e., until other threads C<enqueue> more items).
 
 =item ->dequeue_nb()
 
@@ -461,6 +479,20 @@ behaves the same as C<dequeue_nb>.
 Returns the number of items still in the queue.  Returns C<undef> if the queue
 has been ended (see below), and there are no more items in the queue.
 
+=item ->limit
+
+Sets the size of the queue.  If set, calls to C<enqueue()> will block until
+the number of pending items in the queue drops below the C<limit>.  The
+C<limit> does not prevent enqueuing items beyond that count:
+
+    my $q = Thread::Queue->new(1, 2);
+    $q->limit = 4;
+    $q->enqueue(3, 4, 5);   # Does not block
+    $q->enqueue(6);         # Blocks until at least 2 items are dequeued
+
+    my $size = $q->limit;   # Returns the current limit (may return 'undef')
+    $q->limit = 0;          # Queue size is now unlimited
+
 =item ->end()
 
 Declares that no more items will be added to the queue.
index 0af2db1..f9e258e 100644 (file)
@@ -29,7 +29,7 @@ ok($q, 'New queue');
 my $sm = Thread::Semaphore->new(0);
 my $st = Thread::Semaphore->new(0);
 
-my $thread = threads->create(sub {
+threads->create(sub {
     {
         lock($q);
         $sm->up();
@@ -39,7 +39,7 @@ my $thread = threads->create(sub {
         my @x = $q->extract(5,2);
         is_deeply(\@x, [6,7], 'Thread dequeues under lock');
     }
-});
+})->detach();
 
 $sm->down();
 $st->up();
@@ -47,8 +47,6 @@ my @x = $q->dequeue_nb(100);
 is_deeply(\@x, [1..5,8..10], 'Main dequeues');
 threads::yield();
 
-$thread->join;
-
 exit(0);
 
 # EOF
diff --git a/dist/Thread-Queue/t/11_limit.t b/dist/Thread-Queue/t/11_limit.t
new file mode 100644 (file)
index 0000000..a2ab918
--- /dev/null
@@ -0,0 +1,101 @@
+use strict;
+use warnings;
+
+use Config;
+
+BEGIN {
+    if (! $Config{'useithreads'}) {
+        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+        exit(0);
+    }
+    if (! $Config{'d_select'}) {
+        print("1..0 # SKIP 'select()' not available for testing\n");
+        exit(0);
+    }
+}
+
+use threads;
+use Thread::Queue;
+
+use Test::More;
+
+plan tests => 8;
+
+my $q = Thread::Queue->new();
+my $rpt = Thread::Queue->new();
+
+my $th = threads->create( sub {
+    # (1) Set queue limit, and report it
+    $q->limit = 3;
+    $rpt->enqueue($q->limit);
+
+    # (3) Fetch an item from queue
+    my $item = $q->dequeue();
+    is($item, 1, 'Dequeued item 1');
+    # Report queue count
+    $rpt->enqueue($q->pending());
+
+    # q = (2, 3, 4, 5); r = (4)
+
+    # (4) Enqueue more items - will block
+    $q->enqueue(6, 7);
+    # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3)
+
+    # (6) Get reports from main
+    my @items = $rpt->dequeue(5);
+    is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports');
+
+    # Dequeue all items
+    @items = $q->dequeue_nb(99);
+    is_deeply(\@items, [5, 'foo', 6, 7], 'Queue items');
+});
+
+# (2) Read queue limit from thread
+my $item = $rpt->dequeue();
+is($item, $q->limit, 'Queue limit set');
+# Send items
+$q->enqueue(1, 2, 3, 4, 5);
+
+# (5) Read queue count
+$item = $rpt->dequeue;
+# q = (2, 3, 4, 5); r = ()
+is($item, $q->pending(), 'Queue count');
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (2, 3, 4, 5); r = (4)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 2, 'Dequeued item 2');
+# q = (3, 4, 5); r = (4)
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (3, 4, 5); r = (4, 3)
+
+# 'insert' doesn't care about queue limit
+$q->insert(3, 'foo');
+$rpt->enqueue($q->pending);
+# q = (3, 4, 5, 'foo'); r = (4, 3, 4)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 3, 'Dequeued item 3');
+# q = (3, 4, 5); r = (4)
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (4, 5, 'foo'); r = (4, 3, 4, 3)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 4, 'Dequeued item 4');
+# Thread is now unblocked
+
+# Handshake with thread
+$rpt->enqueue('go');
+
+# (7) - Done
+$th->join;
+
+exit(0);
+
+# EOF