use strict;
use warnings;
-our $VERSION = '3.05';
+our $VERSION = '3.06';
$VERSION = eval $VERSION;
use threads::shared 1.21;
{
my $self = shift;
lock(%$self);
+
if ($$self{'ENDED'}) {
require Carp;
Carp::croak("'enqueue' method called on queue that has been 'end'ed");
}
- push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+
+ # Block if queue size exceeds any specified limit
+ my $queue = $$self{'queue'};
+ cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
+
+ # Add items to queue, and then signal other threads
+ push(@$queue, map { shared_clone($_) } @_)
and cond_signal(%$self);
}
+# Set or return the max. size for a queue
+sub limit : lvalue
+{
+ my $self = shift;
+ lock(%$self);
+ $$self{'LIMIT'};
+}
+
# Return a count of the number of items on a queue
sub pending
{
sub end
{
my $self = shift;
- lock $self;
+ lock(%$self);
# No more data is coming
$$self{'ENDED'} = 1;
# Try to release at least one blocked thread
=head1 VERSION
-This document describes Thread::Queue version 3.05
+This document describes Thread::Queue version 3.06
=head1 SYNOPSIS
# Work on $item
}
+ # Set a size for a queue
+ $q->limit = 5;
+
# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);
Removes the requested number of items (default is 1) from the head of the
queue, and returns them. If the queue contains fewer than the requested
number of items, then the thread will be blocked until the requisite number
-of items are available (i.e., until other threads <enqueue> more items).
+of items are available (i.e., until other threads C<enqueue> more items).
=item ->dequeue_nb()
Returns the number of items still in the queue. Returns C<undef> if the queue
has been ended (see below), and there are no more items in the queue.
+=item ->limit
+
+Sets the size of the queue. If set, calls to C<enqueue()> will block until
+the number of pending items in the queue drops below the C<limit>. The
+C<limit> does not prevent enqueuing items beyond that count:
+
+ my $q = Thread::Queue->new(1, 2);
+ $q->limit = 4;
+ $q->enqueue(3, 4, 5); # Does not block
+ $q->enqueue(6); # Blocks until at least 2 items are dequeued
+
+ my $size = $q->limit; # Returns the current limit (may return 'undef')
+ $q->limit = 0; # Queue size is now unlimited
+
=item ->end()
Declares that no more items will be added to the queue.
--- /dev/null
+use strict;
+use warnings;
+
+use Config;
+
+BEGIN {
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+ if (! $Config{'d_select'}) {
+ print("1..0 # SKIP 'select()' not available for testing\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+use Test::More;
+
+plan tests => 8;
+
+my $q = Thread::Queue->new();
+my $rpt = Thread::Queue->new();
+
+my $th = threads->create( sub {
+ # (1) Set queue limit, and report it
+ $q->limit = 3;
+ $rpt->enqueue($q->limit);
+
+ # (3) Fetch an item from queue
+ my $item = $q->dequeue();
+ is($item, 1, 'Dequeued item 1');
+ # Report queue count
+ $rpt->enqueue($q->pending());
+
+ # q = (2, 3, 4, 5); r = (4)
+
+ # (4) Enqueue more items - will block
+ $q->enqueue(6, 7);
+ # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3)
+
+ # (6) Get reports from main
+ my @items = $rpt->dequeue(5);
+ is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports');
+
+ # Dequeue all items
+ @items = $q->dequeue_nb(99);
+ is_deeply(\@items, [5, 'foo', 6, 7], 'Queue items');
+});
+
+# (2) Read queue limit from thread
+my $item = $rpt->dequeue();
+is($item, $q->limit, 'Queue limit set');
+# Send items
+$q->enqueue(1, 2, 3, 4, 5);
+
+# (5) Read queue count
+$item = $rpt->dequeue;
+# q = (2, 3, 4, 5); r = ()
+is($item, $q->pending(), 'Queue count');
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (2, 3, 4, 5); r = (4)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 2, 'Dequeued item 2');
+# q = (3, 4, 5); r = (4)
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (3, 4, 5); r = (4, 3)
+
+# 'insert' doesn't care about queue limit
+$q->insert(3, 'foo');
+$rpt->enqueue($q->pending);
+# q = (3, 4, 5, 'foo'); r = (4, 3, 4)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 3, 'Dequeued item 3');
+# q = (3, 4, 5); r = (4)
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (4, 5, 'foo'); r = (4, 3, 4, 3)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 4, 'Dequeued item 4');
+# Thread is now unblocked
+
+# Handshake with thread
+$rpt->enqueue('go');
+
+# (7) - Done
+$th->join;
+
+exit(0);
+
+# EOF