dist/Thread-Queue/t/06_insert.t Thread::Queue tests
dist/Thread-Queue/t/07_lock.t Thread::Queue tests
dist/Thread-Queue/t/08_nothreads.t Thread::Queue tests
+dist/Thread-Queue/t/09_ended.t Thread::Queue tests
dist/Thread-Semaphore/lib/Thread/Semaphore.pm Thread-safe semaphores
dist/Thread-Semaphore/t/01_basic.t Thread::Semaphore tests
dist/Thread-Semaphore/t/02_errs.t Thread::Semaphore tests
'Thread::Queue' => {
'MAINTAINER' => 'jdhedden',
- 'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-2.12.tar.gz',
+ 'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.01.tar.gz',
'FILES' => q[dist/Thread-Queue],
'EXCLUDED' => [
- qw( examples/queue.pl
- t/00_load.t
+ qr{^examples/},
+ qw( t/00_load.t
t/99_pod.t
t/test.pl
),
use strict;
use warnings;
-our $VERSION = '2.12';
+our $VERSION = '3.01';
$VERSION = eval $VERSION;
use threads::shared 1.21;
{
my $class = shift;
my @queue :shared = map { shared_clone($_) } @_;
- return bless(\@queue, $class);
+ my %self :shared = ( 'queue' => \@queue );
+ return bless(\%self, $class);
}
# Add items to the tail of a queue
sub enqueue
{
- my $queue = shift;
- lock(@$queue);
- push(@$queue, map { shared_clone($_) } @_)
- and cond_signal(@$queue);
+ my $self = shift;
+ lock(%$self);
+ if ($$self{'ENDED'}) {
+ require Carp;
+ Carp::croak("'enqueue' method called on queue that has been 'end'ed");
+ }
+ push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+ and cond_signal(%$self);
}
# Return a count of the number of items on a queue
sub pending
{
- my $queue = shift;
- lock(@$queue);
- return scalar(@$queue);
+ my $self = shift;
+ lock(%$self);
+ return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
+ return scalar(@{$$self{'queue'}});
+}
+
+# Indicate that no more data will enter the queue
+sub end
+{
+ my $self = shift;
+ lock $self;
+ # No more data is coming
+ $$self{'ENDED'} = 1;
+ # Try to release at least one blocked thread
+ cond_signal(%$self);
}
# Return 1 or more items from the head of a queue, blocking if needed
sub dequeue
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
my $count = @_ ? $validate_count->(shift) : 1;
# Wait for requisite number of items
- cond_wait(@$queue) until (@$queue >= $count);
- cond_signal(@$queue) if (@$queue > $count);
+ cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
+ cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+ # If no longer blocking, try getting whatever is left on the queue
+ return $self->dequeue_nb($count) if ($$self{'ENDED'});
# Return single item
return shift(@$queue) if ($count == 1);
# Return items from the head of a queue with no blocking
sub dequeue_nb
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
my $count = @_ ? $validate_count->(shift) : 1;
# Return an item without removing it from a queue
sub peek
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
my $index = @_ ? $validate_index->(shift) : 0;
- return $$queue[$index];
+ return $$self{'queue'}[$index];
}
# Insert items anywhere into a queue
sub insert
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+
+ if ($$self{'ENDED'}) {
+ require Carp;
+ Carp::croak("'insert' method called on queue that has been 'end'ed");
+ }
+
+ my $queue = $$self{'queue'};
my $index = $validate_index->(shift);
push(@$queue, @tmp);
# Soup's up
- cond_signal(@$queue);
+ cond_signal(%$self);
}
# Remove items from anywhere in a queue
sub extract
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
my $index = @_ ? $validate_index->(shift) : 0;
my $count = @_ ? $validate_count->(shift) : 1;
if ($index < 0) {
$count += $index;
return if ($count <= 0); # Beyond the head of the queue
- return $queue->dequeue_nb($count); # Extract from the head
+ return $self->dequeue_nb($count); # Extract from the head
}
}
=head1 VERSION
-This document describes Thread::Queue version 2.12
+This document describes Thread::Queue version 3.01
=head1 SYNOPSIS
my $q = Thread::Queue->new(); # A new empty queue
# Worker thread
- my $thr = threads->create(sub {
- while (my $item = $q->dequeue()) {
- # Do work on $item
- }
- })->detach();
+ my $thr = threads->create(
+ sub {
+ # Thread will loop until no more work
+ while (defined(my $item = $q->dequeue())) {
+ # Do work on $item
+ ...
+ }
+ }
+ );
# Send work to the thread
$q->enqueue($item1, ...);
+ # Signal that there is no more work to be sent
+ $q->end();
+ # Join up with the thread when it finishes
+ $thr->join();
+ ...
# Count of items in the queue
my $left = $q->pending();
=item ->pending()
-Returns the number of items still in the queue.
+Returns the number of items still in the queue. Returns C<undef> if the queue
+has been ended (see below), and there are no more items in the queue.
+
+=item ->end()
+
+Declares that no more items will be added to the queue.
+
+All threads blocking on C<dequeue()> calls will be unblocked with any
+remaining items in the queue and/or C<undef> being returned. Any subsequent
+calls to C<dequeue()> will behave like C<dequeue_nb()>.
+
+Once ended, no more items may be placed in the queue.
=back
L<threads>, L<threads::shared>
+Sample code in the I<examples> directory of this distribution on CPAN.
+
=head1 MAINTAINER
Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
--- /dev/null
+use strict;
+use warnings;
+
+use Config;
+
+BEGIN {
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+ if (! $Config{'d_select'}) {
+ print("1..0 # SKIP 'select()' not available for testing\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+use Test::More;
+
+my $num_threads = 3;
+my $cycles = 2;
+my $count = 2;
+plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;
+
+# Test for end() while threads are blocked and no more items in queue
+{
+ my @items = 1..($num_threads*$cycles*$count);
+ my $q = Thread::Queue->new(@items);
+ my $r = Thread::Queue->new();
+
+ my @threads;
+ for my $ii (1..$num_threads) {
+ push @threads, threads->create( sub {
+ # Thread will loop until no more work is coming
+ LOOP:
+ while (my @set = $q->dequeue($count)) {
+ foreach my $item (@set) {
+ last LOOP if (! defined($item));
+ pass("'$item' read from queue in thread $ii");
+ }
+ select(undef, undef, undef, rand(1));
+ $r->enqueue($ii);
+ }
+ pass("Thread $ii exiting");
+ });
+ }
+
+ # Make sure there's nothing in the queue and threads are blocking
+ for my $ii (1..($num_threads*$cycles)) {
+ $r->dequeue();
+ }
+ sleep(1);
+ threads->yield();
+
+ is($q->pending(), 0, 'Queue is empty');
+
+ # Signal no more work is coming
+ $q->end();
+
+ is($q->pending(), undef, 'Queue is ended');
+
+ for my $thread (@threads) {
+ $thread->join;
+ pass($thread->tid." joined");
+ }
+}
+
+# Test for end() while threads are blocked and items still remain in queue
+{
+ my @items = 1..($num_threads*$cycles*$count + 1);
+ my $q = Thread::Queue->new(@items);
+ my $r = Thread::Queue->new();
+
+ my @threads;
+ for my $ii (1..$num_threads) {
+ push @threads, threads->create( sub {
+ # Thread will loop until no more work is coming
+ LOOP:
+ while (my @set = $q->dequeue($count)) {
+ foreach my $item (@set) {
+ last LOOP if (! defined($item));
+ pass("'$item' read from queue in thread $ii");
+ }
+ select(undef, undef, undef, rand(1));
+ $r->enqueue($ii);
+ }
+ pass("Thread $ii exiting");
+ });
+ }
+
+ # Make sure there's nothing in the queue and threads are blocking
+ for my $ii (1..($num_threads*$cycles)) {
+ $r->dequeue();
+ }
+ sleep(1);
+ threads->yield();
+
+ is($q->pending(), 1, 'Queue has one left');
+
+ # Signal no more work is coming
+ $q->end();
+
+ for my $thread (@threads) {
+ $thread->join;
+ pass($thread->tid." joined");
+ }
+
+ is($q->pending(), undef, 'Queue is ended');
+}
+
+# Test of end() send while items in queue
+{
+ my @items = 1..($num_threads*$cycles*$count + 1);
+ my $q = Thread::Queue->new(@items);
+
+ my @threads;
+ for my $ii (1..$num_threads) {
+ push @threads, threads->create( sub {
+ # Thread will loop until no more work is coming
+ LOOP:
+ while (my @set = $q->dequeue($count)) {
+ foreach my $item (@set) {
+ last LOOP if (! defined($item));
+ pass("'$item' read from queue in thread $ii");
+ }
+ select(undef, undef, undef, rand(1));
+ }
+ pass("Thread $ii exiting");
+ });
+ }
+
+ # Signal no more work is coming to the blocked threads, they
+ # should unblock.
+ $q->end();
+
+ for my $thread (@threads) {
+ $thread->join;
+ pass($thread->tid." joined");
+ }
+}
+
+exit(0);
+
+# EOF