X-Git-Url: https://perl5.git.perl.org/perl5.git/blobdiff_plain/83272a45226e83bd136d713158e9b44ace2dbc8d..54c7876f687059dc7b09511db127f9ac439f8d8d:/lib/Thread/Queue.pm?ds=sidebyside diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm index 3b5c7c9..f436f04 100644 --- a/lib/Thread/Queue.pm +++ b/lib/Thread/Queue.pm @@ -1,101 +1,527 @@ package Thread::Queue; -use threads::shared; use strict; +use warnings; -our $VERSION = '2.00'; +our $VERSION = '2.03'; + +use threads::shared 0.96; +use Scalar::Util 1.10 qw(looks_like_number); + +# Predeclarations for internal functions +my ($make_shared, $validate_count, $validate_index); + +# Create a new queue possibly pre-populated with items +sub new +{ + my $class = shift; + my @queue :shared = map { $make_shared->($_) } @_; + return bless(\@queue, $class); +} + +# Add items to the tail of a queue +sub enqueue +{ + my $queue = shift; + lock(@$queue); + push(@$queue, map { $make_shared->($_) } @_) + and cond_signal(@$queue); +} + +# Return a count of the number of items on a queue +sub pending +{ + my $queue = shift; + lock(@$queue); + return scalar(@$queue); +} + +# Return 1 or more items from the head of a queue, blocking if needed +sub dequeue +{ + my $queue = shift; + lock(@$queue); + + my $count = @_ ? $validate_count->(shift) : 1; + + # Wait for requisite number of items + cond_wait(@$queue) until (@$queue >= $count); + cond_signal(@$queue) if (@$queue > $count); + + # Return single item + return shift(@$queue) if ($count == 1); + + # Return multiple items + my @items; + push(@items, shift(@$queue)) for (1..$count); + return @items; +} + +# Return items from the head of a queue with no blocking +sub dequeue_nb +{ + my $queue = shift; + lock(@$queue); + + my $count = @_ ? $validate_count->(shift) : 1; + + # Return single item + return shift(@$queue) if ($count == 1); + + # Return multiple items + my @items; + for (1..$count) { + last if (! @$queue); + push(@items, shift(@$queue)); + } + return @items; +} + +# Return an item without removing it from a queue +sub peek +{ + my $queue = shift; + lock(@$queue); + my $index = @_ ? $validate_index->(shift) : 0; + return $$queue[$index]; +} + +# Insert items anywhere into a queue +sub insert +{ + my $queue = shift; + lock(@$queue); + + my $index = $validate_index->(shift); + + return if (! @_); # Nothing to insert + + # Support negative indices + if ($index < 0) { + $index += @$queue; + if ($index < 0) { + $index = 0; + } + } + + # Dequeue items from $index onward + my @tmp; + while (@$queue > $index) { + unshift(@tmp, pop(@$queue)) + } + + # Add new items to the queue + push(@$queue, map { $make_shared->($_) } @_); + + # Add previous items back onto the queue + push(@$queue, @tmp); + + # Soup's up + cond_signal(@$queue); +} + +# Remove items from anywhere in a queue +sub extract +{ + my $queue = shift; + lock(@$queue); + + my $index = @_ ? $validate_index->(shift) : 0; + my $count = @_ ? $validate_count->(shift) : 1; + + # Support negative indices + if ($index < 0) { + $index += @$queue; + if ($index < 0) { + $count += $index; + return if ($count <= 0); # Beyond the head of the queue + return $queue->dequeue_nb($count); # Extract from the head + } + } + + # Dequeue items from $index+$count onward + my @tmp; + while (@$queue > ($index+$count)) { + unshift(@tmp, pop(@$queue)) + } + + # Extract desired items + my @items; + unshift(@items, pop(@$queue)) while (@$queue > $index); + + # Add back any removed items + push(@$queue, @tmp); + + # Return single item + return $items[0] if ($count == 1); + + # Return multiple items + return @items; +} + +### Internal Functions ### + +# Create a thread-shared version of a complex data structure or object +$make_shared = sub { + my $item = shift; + + # If already thread-shared, then just return the input item + return $item if (threads::shared::is_shared($item)); + + # Make copies of array, hash and scalar refs + my $copy; + if (my $ref_type = Scalar::Util::reftype($item)) { + # Copy an array ref + if ($ref_type eq 'ARRAY') { + # Make empty shared array ref + $copy = &share([]); + # Recursively copy and add contents + push(@$copy, map { $make_shared->($_) } @$item); + } + + # Copy a hash ref + elsif ($ref_type eq 'HASH') { + # Make empty shared hash ref + $copy = &share({}); + # Recursively copy and add contents + foreach my $key (keys(%{$item})) { + $copy->{$key} = $make_shared->($item->{$key}); + } + } + + # Copy a scalar ref + elsif ($ref_type eq 'SCALAR') { + $copy = \do{ my $scalar = $$item; }; + share($copy); + # Clone READONLY flag + if (Internals::SvREADONLY($$item)) { + Internals::SvREADONLY($$copy, 1); + } + } + + # Copy of a ref of a ref + elsif ($ref_type eq 'REF') { + my $tmp = $make_shared->($$item); + $copy = \$tmp; + share($copy); + } + } + + # If no copy is created above, then just return the input item + # NOTE: This will end up generating an error for anything + # other than an ordinary scalar + return $item if (! defined($copy)); + + # Clone READONLY flag + if (Internals::SvREADONLY($item)) { + Internals::SvREADONLY($copy, 1); + } + + # If input item is an object, then bless the copy into the same class + if (my $class = Scalar::Util::blessed($item)) { + bless($copy, $class); + } + + return $copy; +}; + +# Check value of the requested index +$validate_index = sub { + my $index = shift; + + if (! looks_like_number($index) || (int($index) != $index)) { + require Carp; + my ($method) = (caller(1))[3]; + $method =~ s/Thread::Queue:://; + $index = 'undef' if (! defined($index)); + Carp::croak("Invalid 'index' argument ($index) to '$method' method"); + } + + return $index; +}; + +# Check value of the requested count +$validate_count = sub { + my $count = shift; + + if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) { + require Carp; + my ($method) = (caller(1))[3]; + $method =~ s/Thread::Queue:://; + $count = 'undef' if (! defined($count)); + Carp::croak("Invalid 'count' argument ($count) to '$method' method"); + } + + return $count; +}; + +1; =head1 NAME -Thread::Queue - thread-safe queues +Thread::Queue - Thread-safe queues + +=head1 VERSION + +This document describes Thread::Queue version 2.03 =head1 SYNOPSIS + use strict; + use warnings; + + use threads; use Thread::Queue; - my $q = new Thread::Queue; - $q->enqueue("foo", "bar"); - my $foo = $q->dequeue; # The "bar" is still in the queue. - my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was empty - my $left = $q->pending; # returns the number of items still in the queue + + my $q = Thread::Queue->new(); # A new empty queue + + # Worker thread + my $thr = threads->create(sub { + while (my $item = $q->dequeue()) { + # Do work on $item + } + })->detach(); + + # Send work to the thread + $q->enqueue($item1, ...); + + + # Count of items in the queue + my $left = $q->pending(); + + # Non-blocking dequeue + if (defined(my $item = $q->dequeue_nb())) { + # Work on $item + } + + # Get the second item in the queue without dequeuing anything + my $item = $q->peek(1); + + # Insert two items into the queue just behind the head + $q->insert(1, $item1, $item2); + + # Extract the last two items on the queue + my ($item1, $item2) = $q->extract(-2, 2); =head1 DESCRIPTION -A queue, as implemented by C is a thread-safe -data structure much like a list. Any number of threads can safely -add elements to the end of the list, or remove elements from the head -of the list. (Queues don't permit adding or removing elements from -the middle of the list). +This module provides thread-safe FIFO queues that can be accessed safely by +any number of threads. + +Any data types supported by L can be passed via queues: + +=over + +=item Ordinary scalars + +=item Array refs + +=item Hash refs + +=item Scalar refs + +=item Objects based on the above + +=back + +Ordinary scalars are added to queues as they are. + +If not already thread-shared, the other complex data types will be cloned +(recursively, if needed, and including any Cings and read-only +settings) into thread-shared structures before being placed onto a queue. -=head1 FUNCTIONS AND METHODS +For example, the following would cause L to create a empty, +shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' +and 'baz' from C<@ary> into it, and then place that shared reference onto +the queue: -=over 8 + my @ary = qw/foo bar baz/; + $q->enqueue(\@ary); -=item new +However, for the following, the items are already shared, so their references +are added directly to the queue, and no cloning takes place: -The C function creates a new empty queue. + my @ary :shared = qw/foo bar baz/; + $q->enqueue(\@ary); -=item enqueue LIST + my $obj = &shared({}); + $$obj{'foo'} = 'bar'; + $$obj{'qux'} = 99; + bless($obj, 'My::Class'); + $q->enqueue($obj); -The C method adds a list of scalars on to the end of the queue. -The queue will grow as needed to accommodate the list. +See L for caveats related to passing objects via queues. -=item dequeue +=head1 QUEUE CREATION -The C method removes a scalar from the head of the queue and -returns it. If the queue is currently empty, C will block the -thread until another thread Cs a scalar. +=over -=item dequeue_nb +=item ->new() -The C method, like the C method, removes a scalar from -the head of the queue and returns it. Unlike C, though, -C won't block if the queue is empty, instead returning -C. +Creates a new empty queue. -=item pending +=item ->new(LIST) -The C method returns the number of items still in the queue. +Creates a new queue pre-populated with the provided list of items. =back -=head1 SEE ALSO +=head1 BASIC METHODS -L, L +The following methods deal with queues on a FIFO basis. -=cut +=over -sub new { - my $class = shift; - my @q : shared = @_; - return bless \@q, $class; -} +=item ->enqueue(LIST) -sub dequeue { - my $q = shift; - lock(@$q); - cond_wait @$q until @$q; - cond_signal @$q if @$q > 1; - return shift @$q; -} +Adds a list of items onto the end of the queue. -sub dequeue_nb { - my $q = shift; - lock(@$q); - return shift @$q; -} +=item ->dequeue() -sub enqueue { - my $q = shift; - lock(@$q); - push @$q, @_ and cond_signal @$q; -} +=item ->dequeue(COUNT) -sub pending { - my $q = shift; - lock(@$q); - return scalar(@$q); -} +Removes the requested number of items (default is 1) from the head of the +queue, and returns them. If the queue contains fewer than the requested +number of items, then the thread will be blocked until the requisite number +of items are available (i.e., until other threads more items). -1; +=item ->dequeue_nb() + +=item ->dequeue_nb(COUNT) + +Removes the requested number of items (default is 1) from the head of the +queue, and returns them. If the queue contains fewer than the requested +number of items, then it immediately (i.e., non-blocking) returns whatever +items there are on the queue. If the queue is empty, then C is +returned. + +=item ->pending() + +Returns the number of items still in the queue. + +=back + +=head1 ADVANCED METHODS + +The following methods can be used to manipulate items anywhere in a queue. + +To prevent the contents of a queue from being modified by another thread +while it is being examined and/or changed, L the queue inside a local block: + + { + lock($q); # Keep other threads from changing the queue's contents + my $item = $q->peek(); + if ($item ...) { + ... + } + } + # Queue is now unlocked + +=over + +=item ->peek() + +=item ->peek(INDEX) + +Returns an item from the queue without dequeuing anything. Defaults to the +the head of queue (at index position 0) if no index is specified. Negative +index values are supported as with L (i.e., -1 +is the end of the queue, -2 is next to last, and so on). + +If no items exists at the specified index (i.e., the queue is empty, or the +index is beyond the number of items on the queue), then C is returned. + +Remember, the returned item is not removed from the queue, so manipulating a +Ced at reference affects the item on the queue. + +=item ->insert(INDEX, LIST) + +Adds the list of items to the queue at the specified index position (0 +is the head of the list). Any existing items at and beyond that position are +pushed back past the newly added items: + + $q->enqueue(1, 2, 3, 4); + $q->insert(1, qw/foo bar/); + # Queue now contains: 1, foo, bar, 2, 3, 4 +Specifying an index position greater than the number of items in the queue +just adds the list to the end. +Negative index positions are supported: + + $q->enqueue(1, 2, 3, 4); + $q->insert(-2, qw/foo bar/); + # Queue now contains: 1, 2, foo, bar, 3, 4 + +Specifying a negative index position greater than the number of items in the +queue adds the list to the head of the queue. + +=item ->extract() + +=item ->extract(INDEX) + +=item ->extract(INDEX, COUNT) + +Removes and returns the specified number of items (defaults to 1) from the +specified index position in the queue (0 is the head of the queue). When +called with no arguments, C operates the same as C. + +This method is non-blocking, and will return only as many items as are +available to fulfill the request: + + $q->enqueue(1, 2, 3, 4); + my $item = $q->extract(2) # Returns 3 + # Queue now contains: 1, 2, 4 + my @items = $q->extract(1, 3) # Returns (2, 4) + # Queue now contains: 1 + +Specifying an index position greater than the number of items in the +queue results in C or an empty list being returned. + + $q->enqueue('foo'); + my $nada = $q->extract(3) # Returns undef + my @nada = $q->extract(1, 3) # Returns () + +Negative index positions are supported. Specifying a negative index position +greater than the number of items in the queue may return items from the head +of the queue (similar to C) if the count overlaps the head of the +queue from the specified position (i.e. if queue size + index + count is +greater than zero): + + $q->enqueue(qw/foo bar baz/); + my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 + my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 + # Queue now contains: bar, baz + my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 + +=back + +=head1 LIMITATIONS + +Passing objects on queues may not work if the objects' classes do not support +sharing. See L for more. + +Passing array/hash refs that contain objects may not work for Perl prior to +5.10.0. + +=head1 SEE ALSO + +Thread::Queue Discussion Forum on CPAN: +L + +Annotated POD for Thread::Queue: +L + +L, L + +=head1 MAINTAINER + +Jerry D. Hedden, Sjdhedden AT cpan DOT orgE> + +=head1 LICENSE + +This program is free software; you can redistribute it and/or modify it under +the same terms as Perl itself. + +=cut