This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.01
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
1fd4700e 6our $VERSION = '3.01';
23e2fda9 7$VERSION = eval $VERSION;
54c7876f 8
09782346 9use threads::shared 1.21;
ac9d3a9d 10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
54c7876f 11
09782346
JH
12# Carp errors from threads::shared calls should complain about caller
13our @CARP_NOT = ("threads::shared");
14
54c7876f 15# Predeclarations for internal functions
09782346 16my ($validate_count, $validate_index);
54c7876f
JH
17
18# Create a new queue possibly pre-populated with items
19sub new
20{
21 my $class = shift;
09782346 22 my @queue :shared = map { shared_clone($_) } @_;
1fd4700e
JH
23 my %self :shared = ( 'queue' => \@queue );
24 return bless(\%self, $class);
54c7876f
JH
25}
26
27# Add items to the tail of a queue
28sub enqueue
29{
1fd4700e
JH
30 my $self = shift;
31 lock(%$self);
32 if ($$self{'ENDED'}) {
33 require Carp;
34 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
35 }
36 push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
37 and cond_signal(%$self);
54c7876f
JH
38}
39
40# Return a count of the number of items on a queue
41sub pending
42{
1fd4700e
JH
43 my $self = shift;
44 lock(%$self);
45 return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
46 return scalar(@{$$self{'queue'}});
47}
48
49# Indicate that no more data will enter the queue
50sub end
51{
52 my $self = shift;
53 lock $self;
54 # No more data is coming
55 $$self{'ENDED'} = 1;
56 # Try to release at least one blocked thread
57 cond_signal(%$self);
54c7876f
JH
58}
59
60# Return 1 or more items from the head of a queue, blocking if needed
61sub dequeue
62{
1fd4700e
JH
63 my $self = shift;
64 lock(%$self);
65 my $queue = $$self{'queue'};
54c7876f
JH
66
67 my $count = @_ ? $validate_count->(shift) : 1;
68
69 # Wait for requisite number of items
1fd4700e
JH
70 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
71 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
72
73 # If no longer blocking, try getting whatever is left on the queue
74 return $self->dequeue_nb($count) if ($$self{'ENDED'});
54c7876f
JH
75
76 # Return single item
77 return shift(@$queue) if ($count == 1);
78
79 # Return multiple items
80 my @items;
81 push(@items, shift(@$queue)) for (1..$count);
82 return @items;
83}
84
85# Return items from the head of a queue with no blocking
86sub dequeue_nb
87{
1fd4700e
JH
88 my $self = shift;
89 lock(%$self);
90 my $queue = $$self{'queue'};
54c7876f
JH
91
92 my $count = @_ ? $validate_count->(shift) : 1;
93
94 # Return single item
95 return shift(@$queue) if ($count == 1);
96
97 # Return multiple items
98 my @items;
99 for (1..$count) {
100 last if (! @$queue);
101 push(@items, shift(@$queue));
102 }
103 return @items;
104}
105
106# Return an item without removing it from a queue
107sub peek
108{
1fd4700e
JH
109 my $self = shift;
110 lock(%$self);
54c7876f 111 my $index = @_ ? $validate_index->(shift) : 0;
1fd4700e 112 return $$self{'queue'}[$index];
54c7876f
JH
113}
114
115# Insert items anywhere into a queue
116sub insert
117{
1fd4700e
JH
118 my $self = shift;
119 lock(%$self);
120
121 if ($$self{'ENDED'}) {
122 require Carp;
123 Carp::croak("'insert' method called on queue that has been 'end'ed");
124 }
125
126 my $queue = $$self{'queue'};
54c7876f
JH
127
128 my $index = $validate_index->(shift);
129
130 return if (! @_); # Nothing to insert
131
132 # Support negative indices
133 if ($index < 0) {
134 $index += @$queue;
135 if ($index < 0) {
136 $index = 0;
137 }
138 }
139
140 # Dequeue items from $index onward
141 my @tmp;
142 while (@$queue > $index) {
143 unshift(@tmp, pop(@$queue))
144 }
145
146 # Add new items to the queue
09782346 147 push(@$queue, map { shared_clone($_) } @_);
54c7876f
JH
148
149 # Add previous items back onto the queue
150 push(@$queue, @tmp);
151
152 # Soup's up
1fd4700e 153 cond_signal(%$self);
54c7876f
JH
154}
155
156# Remove items from anywhere in a queue
157sub extract
158{
1fd4700e
JH
159 my $self = shift;
160 lock(%$self);
161 my $queue = $$self{'queue'};
54c7876f
JH
162
163 my $index = @_ ? $validate_index->(shift) : 0;
164 my $count = @_ ? $validate_count->(shift) : 1;
165
166 # Support negative indices
167 if ($index < 0) {
168 $index += @$queue;
169 if ($index < 0) {
170 $count += $index;
171 return if ($count <= 0); # Beyond the head of the queue
1fd4700e 172 return $self->dequeue_nb($count); # Extract from the head
54c7876f
JH
173 }
174 }
175
176 # Dequeue items from $index+$count onward
177 my @tmp;
178 while (@$queue > ($index+$count)) {
179 unshift(@tmp, pop(@$queue))
180 }
181
182 # Extract desired items
183 my @items;
184 unshift(@items, pop(@$queue)) while (@$queue > $index);
185
186 # Add back any removed items
187 push(@$queue, @tmp);
188
189 # Return single item
190 return $items[0] if ($count == 1);
191
192 # Return multiple items
193 return @items;
194}
195
196### Internal Functions ###
197
54c7876f
JH
198# Check value of the requested index
199$validate_index = sub {
200 my $index = shift;
201
e336069e
JH
202 if (! defined($index) ||
203 ! looks_like_number($index) ||
204 (int($index) != $index))
205 {
54c7876f
JH
206 require Carp;
207 my ($method) = (caller(1))[3];
208 $method =~ s/Thread::Queue:://;
209 $index = 'undef' if (! defined($index));
210 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
211 }
212
213 return $index;
214};
215
216# Check value of the requested count
217$validate_count = sub {
218 my $count = shift;
219
e336069e
JH
220 if (! defined($count) ||
221 ! looks_like_number($count) ||
222 (int($count) != $count) ||
223 ($count < 1))
224 {
54c7876f
JH
225 require Carp;
226 my ($method) = (caller(1))[3];
227 $method =~ s/Thread::Queue:://;
228 $count = 'undef' if (! defined($count));
229 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
230 }
231
232 return $count;
233};
234
2351;
9c6f8578 236
d516a115
JH
237=head1 NAME
238
54c7876f
JH
239Thread::Queue - Thread-safe queues
240
241=head1 VERSION
242
1fd4700e 243This document describes Thread::Queue version 3.01
d516a115
JH
244
245=head1 SYNOPSIS
246
54c7876f
JH
247 use strict;
248 use warnings;
249
250 use threads;
d516a115 251 use Thread::Queue;
54c7876f
JH
252
253 my $q = Thread::Queue->new(); # A new empty queue
254
255 # Worker thread
1fd4700e
JH
256 my $thr = threads->create(
257 sub {
258 # Thread will loop until no more work
259 while (defined(my $item = $q->dequeue())) {
260 # Do work on $item
261 ...
262 }
263 }
264 );
54c7876f
JH
265
266 # Send work to the thread
267 $q->enqueue($item1, ...);
1fd4700e
JH
268 # Signal that there is no more work to be sent
269 $q->end();
270 # Join up with the thread when it finishes
271 $thr->join();
54c7876f 272
1fd4700e 273 ...
54c7876f
JH
274
275 # Count of items in the queue
276 my $left = $q->pending();
277
278 # Non-blocking dequeue
279 if (defined(my $item = $q->dequeue_nb())) {
280 # Work on $item
281 }
282
283 # Get the second item in the queue without dequeuing anything
284 my $item = $q->peek(1);
285
286 # Insert two items into the queue just behind the head
287 $q->insert(1, $item1, $item2);
288
289 # Extract the last two items on the queue
290 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 291
a99072da
HM
292=head1 DESCRIPTION
293
54c7876f
JH
294This module provides thread-safe FIFO queues that can be accessed safely by
295any number of threads.
296
297Any data types supported by L<threads::shared> can be passed via queues:
298
299=over
300
301=item Ordinary scalars
302
303=item Array refs
304
305=item Hash refs
306
307=item Scalar refs
308
309=item Objects based on the above
310
311=back
312
313Ordinary scalars are added to queues as they are.
314
315If not already thread-shared, the other complex data types will be cloned
316(recursively, if needed, and including any C<bless>ings and read-only
317settings) into thread-shared structures before being placed onto a queue.
a99072da 318
54c7876f
JH
319For example, the following would cause L<Thread::Queue> to create a empty,
320shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
321and 'baz' from C<@ary> into it, and then place that shared reference onto
322the queue:
a99072da 323
54c7876f
JH
324 my @ary = qw/foo bar baz/;
325 $q->enqueue(\@ary);
a99072da 326
54c7876f
JH
327However, for the following, the items are already shared, so their references
328are added directly to the queue, and no cloning takes place:
a99072da 329
54c7876f
JH
330 my @ary :shared = qw/foo bar baz/;
331 $q->enqueue(\@ary);
a99072da 332
54c7876f
JH
333 my $obj = &shared({});
334 $$obj{'foo'} = 'bar';
335 $$obj{'qux'} = 99;
336 bless($obj, 'My::Class');
337 $q->enqueue($obj);
a99072da 338
54c7876f 339See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 340
54c7876f 341=head1 QUEUE CREATION
a99072da 342
54c7876f 343=over
a99072da 344
54c7876f 345=item ->new()
a99072da 346
54c7876f 347Creates a new empty queue.
a99072da 348
54c7876f 349=item ->new(LIST)
a99072da 350
54c7876f 351Creates a new queue pre-populated with the provided list of items.
a99072da
HM
352
353=back
354
54c7876f 355=head1 BASIC METHODS
a99072da 356
54c7876f 357The following methods deal with queues on a FIFO basis.
bbc7dcd2 358
54c7876f 359=over
d516a115 360
54c7876f 361=item ->enqueue(LIST)
d21067e0 362
54c7876f 363Adds a list of items onto the end of the queue.
d21067e0 364
54c7876f 365=item ->dequeue()
a99072da 366
54c7876f 367=item ->dequeue(COUNT)
d21067e0 368
54c7876f
JH
369Removes the requested number of items (default is 1) from the head of the
370queue, and returns them. If the queue contains fewer than the requested
371number of items, then the thread will be blocked until the requisite number
372of items are available (i.e., until other threads <enqueue> more items).
a99072da 373
54c7876f
JH
374=item ->dequeue_nb()
375
376=item ->dequeue_nb(COUNT)
377
378Removes the requested number of items (default is 1) from the head of the
379queue, and returns them. If the queue contains fewer than the requested
380number of items, then it immediately (i.e., non-blocking) returns whatever
381items there are on the queue. If the queue is empty, then C<undef> is
382returned.
383
384=item ->pending()
385
1fd4700e
JH
386Returns the number of items still in the queue. Returns C<undef> if the queue
387has been ended (see below), and there are no more items in the queue.
388
389=item ->end()
390
391Declares that no more items will be added to the queue.
392
393All threads blocking on C<dequeue()> calls will be unblocked with any
394remaining items in the queue and/or C<undef> being returned. Any subsequent
395calls to C<dequeue()> will behave like C<dequeue_nb()>.
396
397Once ended, no more items may be placed in the queue.
54c7876f
JH
398
399=back
400
401=head1 ADVANCED METHODS
402
403The following methods can be used to manipulate items anywhere in a queue.
404
405To prevent the contents of a queue from being modified by another thread
406while it is being examined and/or changed, L<lock|threads::shared/"lock
407VARIABLE"> the queue inside a local block:
408
409 {
410 lock($q); # Keep other threads from changing the queue's contents
411 my $item = $q->peek();
412 if ($item ...) {
413 ...
414 }
415 }
416 # Queue is now unlocked
417
418=over
419
420=item ->peek()
421
422=item ->peek(INDEX)
423
424Returns an item from the queue without dequeuing anything. Defaults to the
425the head of queue (at index position 0) if no index is specified. Negative
426index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
427is the end of the queue, -2 is next to last, and so on).
428
429If no items exists at the specified index (i.e., the queue is empty, or the
430index is beyond the number of items on the queue), then C<undef> is returned.
431
432Remember, the returned item is not removed from the queue, so manipulating a
433C<peek>ed at reference affects the item on the queue.
434
435=item ->insert(INDEX, LIST)
436
437Adds the list of items to the queue at the specified index position (0
438is the head of the list). Any existing items at and beyond that position are
439pushed back past the newly added items:
440
441 $q->enqueue(1, 2, 3, 4);
442 $q->insert(1, qw/foo bar/);
443 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 444
54c7876f
JH
445Specifying an index position greater than the number of items in the queue
446just adds the list to the end.
83272a45 447
54c7876f
JH
448Negative index positions are supported:
449
450 $q->enqueue(1, 2, 3, 4);
451 $q->insert(-2, qw/foo bar/);
452 # Queue now contains: 1, 2, foo, bar, 3, 4
453
454Specifying a negative index position greater than the number of items in the
455queue adds the list to the head of the queue.
456
457=item ->extract()
458
459=item ->extract(INDEX)
460
461=item ->extract(INDEX, COUNT)
462
463Removes and returns the specified number of items (defaults to 1) from the
464specified index position in the queue (0 is the head of the queue). When
465called with no arguments, C<extract> operates the same as C<dequeue_nb>.
466
467This method is non-blocking, and will return only as many items as are
468available to fulfill the request:
469
470 $q->enqueue(1, 2, 3, 4);
471 my $item = $q->extract(2) # Returns 3
472 # Queue now contains: 1, 2, 4
473 my @items = $q->extract(1, 3) # Returns (2, 4)
474 # Queue now contains: 1
475
476Specifying an index position greater than the number of items in the
477queue results in C<undef> or an empty list being returned.
478
479 $q->enqueue('foo');
480 my $nada = $q->extract(3) # Returns undef
481 my @nada = $q->extract(1, 3) # Returns ()
482
483Negative index positions are supported. Specifying a negative index position
484greater than the number of items in the queue may return items from the head
485of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
486queue from the specified position (i.e. if queue size + index + count is
487greater than zero):
488
489 $q->enqueue(qw/foo bar baz/);
490 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
491 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
492 # Queue now contains: bar, baz
493 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
494
495=back
496
7fb1c73b
JH
497=head1 NOTES
498
499Queues created by L<Thread::Queue> can be used in both threaded and
500non-threaded applications.
501
54c7876f
JH
502=head1 LIMITATIONS
503
504Passing objects on queues may not work if the objects' classes do not support
505sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
506
507Passing array/hash refs that contain objects may not work for Perl prior to
5085.10.0.
509
510=head1 SEE ALSO
511
512Thread::Queue Discussion Forum on CPAN:
513L<http://www.cpanforum.com/dist/Thread-Queue>
514
54c7876f
JH
515L<threads>, L<threads::shared>
516
1fd4700e
JH
517Sample code in the I<examples> directory of this distribution on CPAN.
518
54c7876f
JH
519=head1 MAINTAINER
520
521Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
522
523=head1 LICENSE
524
525This program is free software; you can redistribute it and/or modify it under
526the same terms as Perl itself.
527
528=cut