This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.06
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
e128eaa1 6our $VERSION = '3.06';
23e2fda9 7$VERSION = eval $VERSION;
54c7876f 8
09782346 9use threads::shared 1.21;
ac9d3a9d 10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
54c7876f 11
09782346
JH
12# Carp errors from threads::shared calls should complain about caller
13our @CARP_NOT = ("threads::shared");
14
54c7876f
JH
15# Create a new queue possibly pre-populated with items
16sub new
17{
18 my $class = shift;
09782346 19 my @queue :shared = map { shared_clone($_) } @_;
1fd4700e
JH
20 my %self :shared = ( 'queue' => \@queue );
21 return bless(\%self, $class);
54c7876f
JH
22}
23
24# Add items to the tail of a queue
25sub enqueue
26{
1fd4700e
JH
27 my $self = shift;
28 lock(%$self);
e128eaa1 29
1fd4700e
JH
30 if ($$self{'ENDED'}) {
31 require Carp;
32 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33 }
e128eaa1
JH
34
35 # Block if queue size exceeds any specified limit
36 my $queue = $$self{'queue'};
37 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38
39 # Add items to queue, and then signal other threads
40 push(@$queue, map { shared_clone($_) } @_)
1fd4700e 41 and cond_signal(%$self);
54c7876f
JH
42}
43
e128eaa1
JH
44# Set or return the max. size for a queue
45sub limit : lvalue
46{
47 my $self = shift;
48 lock(%$self);
49 $$self{'LIMIT'};
50}
51
54c7876f
JH
52# Return a count of the number of items on a queue
53sub pending
54{
1fd4700e
JH
55 my $self = shift;
56 lock(%$self);
57 return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
58 return scalar(@{$$self{'queue'}});
59}
60
61# Indicate that no more data will enter the queue
62sub end
63{
64 my $self = shift;
e128eaa1 65 lock(%$self);
1fd4700e
JH
66 # No more data is coming
67 $$self{'ENDED'} = 1;
68 # Try to release at least one blocked thread
69 cond_signal(%$self);
54c7876f
JH
70}
71
72# Return 1 or more items from the head of a queue, blocking if needed
73sub dequeue
74{
1fd4700e
JH
75 my $self = shift;
76 lock(%$self);
77 my $queue = $$self{'queue'};
54c7876f 78
9df8b036 79 my $count = @_ ? $self->_validate_count(shift) : 1;
54c7876f
JH
80
81 # Wait for requisite number of items
1fd4700e
JH
82 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
84
85 # If no longer blocking, try getting whatever is left on the queue
86 return $self->dequeue_nb($count) if ($$self{'ENDED'});
54c7876f
JH
87
88 # Return single item
89 return shift(@$queue) if ($count == 1);
90
91 # Return multiple items
92 my @items;
93 push(@items, shift(@$queue)) for (1..$count);
94 return @items;
95}
96
97# Return items from the head of a queue with no blocking
98sub dequeue_nb
99{
1fd4700e
JH
100 my $self = shift;
101 lock(%$self);
102 my $queue = $$self{'queue'};
54c7876f 103
9df8b036 104 my $count = @_ ? $self->_validate_count(shift) : 1;
54c7876f
JH
105
106 # Return single item
107 return shift(@$queue) if ($count == 1);
108
109 # Return multiple items
110 my @items;
111 for (1..$count) {
112 last if (! @$queue);
113 push(@items, shift(@$queue));
114 }
115 return @items;
116}
117
c7bac10a
JH
118# Return items from the head of a queue, blocking if needed up to a timeout
119sub dequeue_timed
120{
121 my $self = shift;
122 lock(%$self);
123 my $queue = $$self{'queue'};
124
125 # Timeout may be relative or absolute
9df8b036 126 my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
c7bac10a
JH
127 # Convert to an absolute time for use with cond_timedwait()
128 if ($timeout < 32000000) { # More than one year
129 $timeout += time();
130 }
131
9df8b036 132 my $count = @_ ? $self->_validate_count(shift) : 1;
c7bac10a
JH
133
134 # Wait for requisite number of items, or until timeout
135 while ((@$queue < $count) && ! $$self{'ENDED'}) {
136 last if (! cond_timedwait(%$self, $timeout));
137 }
138 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
139
140 # Get whatever we need off the queue if available
141 return $self->dequeue_nb($count);
142}
143
54c7876f
JH
144# Return an item without removing it from a queue
145sub peek
146{
1fd4700e
JH
147 my $self = shift;
148 lock(%$self);
9df8b036 149 my $index = @_ ? $self->_validate_index(shift) : 0;
1fd4700e 150 return $$self{'queue'}[$index];
54c7876f
JH
151}
152
153# Insert items anywhere into a queue
154sub insert
155{
1fd4700e
JH
156 my $self = shift;
157 lock(%$self);
158
159 if ($$self{'ENDED'}) {
160 require Carp;
161 Carp::croak("'insert' method called on queue that has been 'end'ed");
162 }
163
164 my $queue = $$self{'queue'};
54c7876f 165
9df8b036 166 my $index = $self->_validate_index(shift);
54c7876f
JH
167
168 return if (! @_); # Nothing to insert
169
170 # Support negative indices
171 if ($index < 0) {
172 $index += @$queue;
173 if ($index < 0) {
174 $index = 0;
175 }
176 }
177
178 # Dequeue items from $index onward
179 my @tmp;
180 while (@$queue > $index) {
181 unshift(@tmp, pop(@$queue))
182 }
183
184 # Add new items to the queue
09782346 185 push(@$queue, map { shared_clone($_) } @_);
54c7876f
JH
186
187 # Add previous items back onto the queue
188 push(@$queue, @tmp);
189
190 # Soup's up
1fd4700e 191 cond_signal(%$self);
54c7876f
JH
192}
193
194# Remove items from anywhere in a queue
195sub extract
196{
1fd4700e
JH
197 my $self = shift;
198 lock(%$self);
199 my $queue = $$self{'queue'};
54c7876f 200
9df8b036
JH
201 my $index = @_ ? $self->_validate_index(shift) : 0;
202 my $count = @_ ? $self->_validate_count(shift) : 1;
54c7876f
JH
203
204 # Support negative indices
205 if ($index < 0) {
206 $index += @$queue;
207 if ($index < 0) {
208 $count += $index;
209 return if ($count <= 0); # Beyond the head of the queue
1fd4700e 210 return $self->dequeue_nb($count); # Extract from the head
54c7876f
JH
211 }
212 }
213
214 # Dequeue items from $index+$count onward
215 my @tmp;
216 while (@$queue > ($index+$count)) {
217 unshift(@tmp, pop(@$queue))
218 }
219
220 # Extract desired items
221 my @items;
222 unshift(@items, pop(@$queue)) while (@$queue > $index);
223
224 # Add back any removed items
225 push(@$queue, @tmp);
226
227 # Return single item
228 return $items[0] if ($count == 1);
229
230 # Return multiple items
231 return @items;
232}
233
9df8b036 234### Internal Methods ###
54c7876f 235
54c7876f 236# Check value of the requested index
9df8b036
JH
237sub _validate_index
238{
239 my $self = shift;
54c7876f
JH
240 my $index = shift;
241
e336069e
JH
242 if (! defined($index) ||
243 ! looks_like_number($index) ||
244 (int($index) != $index))
245 {
54c7876f
JH
246 require Carp;
247 my ($method) = (caller(1))[3];
af25b78b 248 my $class_name = ref($self);
4e75700d 249 $method =~ s/$class_name\:://;
54c7876f
JH
250 $index = 'undef' if (! defined($index));
251 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
252 }
253
254 return $index;
255};
256
257# Check value of the requested count
9df8b036
JH
258sub _validate_count
259{
260 my $self = shift;
54c7876f
JH
261 my $count = shift;
262
e336069e
JH
263 if (! defined($count) ||
264 ! looks_like_number($count) ||
265 (int($count) != $count) ||
266 ($count < 1))
267 {
54c7876f
JH
268 require Carp;
269 my ($method) = (caller(1))[3];
af25b78b 270 my $class_name = ref($self);
4e75700d 271 $method =~ s/$class_name\:://;
54c7876f
JH
272 $count = 'undef' if (! defined($count));
273 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
274 }
275
276 return $count;
277};
278
c7bac10a 279# Check value of the requested timeout
9df8b036
JH
280sub _validate_timeout
281{
282 my $self = shift;
c7bac10a
JH
283 my $timeout = shift;
284
285 if (! defined($timeout) ||
286 ! looks_like_number($timeout))
287 {
288 require Carp;
289 my ($method) = (caller(1))[3];
af25b78b 290 my $class_name = ref($self);
4e75700d 291 $method =~ s/$class_name\:://;
c7bac10a
JH
292 $timeout = 'undef' if (! defined($timeout));
293 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
294 }
295
296 return $timeout;
297};
298
54c7876f 2991;
9c6f8578 300
d516a115
JH
301=head1 NAME
302
54c7876f
JH
303Thread::Queue - Thread-safe queues
304
305=head1 VERSION
306
e128eaa1 307This document describes Thread::Queue version 3.06
d516a115
JH
308
309=head1 SYNOPSIS
310
54c7876f
JH
311 use strict;
312 use warnings;
313
314 use threads;
d516a115 315 use Thread::Queue;
54c7876f
JH
316
317 my $q = Thread::Queue->new(); # A new empty queue
318
319 # Worker thread
1fd4700e
JH
320 my $thr = threads->create(
321 sub {
322 # Thread will loop until no more work
323 while (defined(my $item = $q->dequeue())) {
324 # Do work on $item
325 ...
326 }
327 }
328 );
54c7876f
JH
329
330 # Send work to the thread
331 $q->enqueue($item1, ...);
1fd4700e
JH
332 # Signal that there is no more work to be sent
333 $q->end();
334 # Join up with the thread when it finishes
335 $thr->join();
54c7876f 336
1fd4700e 337 ...
54c7876f
JH
338
339 # Count of items in the queue
340 my $left = $q->pending();
341
342 # Non-blocking dequeue
343 if (defined(my $item = $q->dequeue_nb())) {
344 # Work on $item
345 }
346
c7bac10a
JH
347 # Blocking dequeue with 5-second timeout
348 if (defined(my $item = $q->dequeue_timed(5))) {
349 # Work on $item
350 }
351
e128eaa1
JH
352 # Set a size for a queue
353 $q->limit = 5;
354
54c7876f
JH
355 # Get the second item in the queue without dequeuing anything
356 my $item = $q->peek(1);
357
358 # Insert two items into the queue just behind the head
359 $q->insert(1, $item1, $item2);
360
361 # Extract the last two items on the queue
362 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 363
a99072da
HM
364=head1 DESCRIPTION
365
54c7876f
JH
366This module provides thread-safe FIFO queues that can be accessed safely by
367any number of threads.
368
369Any data types supported by L<threads::shared> can be passed via queues:
370
371=over
372
373=item Ordinary scalars
374
375=item Array refs
376
377=item Hash refs
378
379=item Scalar refs
380
381=item Objects based on the above
382
383=back
384
385Ordinary scalars are added to queues as they are.
386
387If not already thread-shared, the other complex data types will be cloned
388(recursively, if needed, and including any C<bless>ings and read-only
389settings) into thread-shared structures before being placed onto a queue.
a99072da 390
54c7876f
JH
391For example, the following would cause L<Thread::Queue> to create a empty,
392shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
393and 'baz' from C<@ary> into it, and then place that shared reference onto
394the queue:
a99072da 395
54c7876f
JH
396 my @ary = qw/foo bar baz/;
397 $q->enqueue(\@ary);
a99072da 398
54c7876f
JH
399However, for the following, the items are already shared, so their references
400are added directly to the queue, and no cloning takes place:
a99072da 401
54c7876f
JH
402 my @ary :shared = qw/foo bar baz/;
403 $q->enqueue(\@ary);
a99072da 404
54c7876f
JH
405 my $obj = &shared({});
406 $$obj{'foo'} = 'bar';
407 $$obj{'qux'} = 99;
408 bless($obj, 'My::Class');
409 $q->enqueue($obj);
a99072da 410
54c7876f 411See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 412
54c7876f 413=head1 QUEUE CREATION
a99072da 414
54c7876f 415=over
a99072da 416
54c7876f 417=item ->new()
a99072da 418
54c7876f 419Creates a new empty queue.
a99072da 420
54c7876f 421=item ->new(LIST)
a99072da 422
54c7876f 423Creates a new queue pre-populated with the provided list of items.
a99072da
HM
424
425=back
426
54c7876f 427=head1 BASIC METHODS
a99072da 428
54c7876f 429The following methods deal with queues on a FIFO basis.
bbc7dcd2 430
54c7876f 431=over
d516a115 432
54c7876f 433=item ->enqueue(LIST)
d21067e0 434
54c7876f 435Adds a list of items onto the end of the queue.
d21067e0 436
54c7876f 437=item ->dequeue()
a99072da 438
54c7876f 439=item ->dequeue(COUNT)
d21067e0 440
54c7876f
JH
441Removes the requested number of items (default is 1) from the head of the
442queue, and returns them. If the queue contains fewer than the requested
443number of items, then the thread will be blocked until the requisite number
e128eaa1 444of items are available (i.e., until other threads C<enqueue> more items).
a99072da 445
54c7876f
JH
446=item ->dequeue_nb()
447
448=item ->dequeue_nb(COUNT)
449
450Removes the requested number of items (default is 1) from the head of the
451queue, and returns them. If the queue contains fewer than the requested
452number of items, then it immediately (i.e., non-blocking) returns whatever
453items there are on the queue. If the queue is empty, then C<undef> is
454returned.
455
c7bac10a
JH
456=item ->dequeue_timed(TIMEOUT)
457
458=item ->dequeue_timed(TIMEOUT, COUNT)
459
460Removes the requested number of items (default is 1) from the head of the
461queue, and returns them. If the queue contains fewer than the requested
462number of items, then the thread will be blocked until the requisite number of
463items are available, or until the timeout is reached. If the timeout is
464reached, it returns whatever items there are on the queue, or C<undef> if the
465queue is empty.
466
467The timeout may be a number of seconds relative to the current time (e.g., 5
468seconds from when the call is made), or may be an absolute timeout in I<epoch>
469seconds the same as would be used with
470L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
471Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
472the underlying implementation).
473
9df8b036 474If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
c7bac10a
JH
475behaves the same as C<dequeue_nb>.
476
54c7876f
JH
477=item ->pending()
478
1fd4700e
JH
479Returns the number of items still in the queue. Returns C<undef> if the queue
480has been ended (see below), and there are no more items in the queue.
481
e128eaa1
JH
482=item ->limit
483
484Sets the size of the queue. If set, calls to C<enqueue()> will block until
485the number of pending items in the queue drops below the C<limit>. The
486C<limit> does not prevent enqueuing items beyond that count:
487
488 my $q = Thread::Queue->new(1, 2);
489 $q->limit = 4;
490 $q->enqueue(3, 4, 5); # Does not block
491 $q->enqueue(6); # Blocks until at least 2 items are dequeued
492
493 my $size = $q->limit; # Returns the current limit (may return 'undef')
494 $q->limit = 0; # Queue size is now unlimited
495
1fd4700e
JH
496=item ->end()
497
498Declares that no more items will be added to the queue.
499
500All threads blocking on C<dequeue()> calls will be unblocked with any
501remaining items in the queue and/or C<undef> being returned. Any subsequent
502calls to C<dequeue()> will behave like C<dequeue_nb()>.
503
504Once ended, no more items may be placed in the queue.
54c7876f
JH
505
506=back
507
508=head1 ADVANCED METHODS
509
510The following methods can be used to manipulate items anywhere in a queue.
511
512To prevent the contents of a queue from being modified by another thread
513while it is being examined and/or changed, L<lock|threads::shared/"lock
514VARIABLE"> the queue inside a local block:
515
516 {
517 lock($q); # Keep other threads from changing the queue's contents
518 my $item = $q->peek();
519 if ($item ...) {
520 ...
521 }
522 }
523 # Queue is now unlocked
524
525=over
526
527=item ->peek()
528
529=item ->peek(INDEX)
530
531Returns an item from the queue without dequeuing anything. Defaults to the
532the head of queue (at index position 0) if no index is specified. Negative
533index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
534is the end of the queue, -2 is next to last, and so on).
535
536If no items exists at the specified index (i.e., the queue is empty, or the
537index is beyond the number of items on the queue), then C<undef> is returned.
538
539Remember, the returned item is not removed from the queue, so manipulating a
540C<peek>ed at reference affects the item on the queue.
541
542=item ->insert(INDEX, LIST)
543
544Adds the list of items to the queue at the specified index position (0
545is the head of the list). Any existing items at and beyond that position are
546pushed back past the newly added items:
547
548 $q->enqueue(1, 2, 3, 4);
549 $q->insert(1, qw/foo bar/);
550 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 551
54c7876f
JH
552Specifying an index position greater than the number of items in the queue
553just adds the list to the end.
83272a45 554
54c7876f
JH
555Negative index positions are supported:
556
557 $q->enqueue(1, 2, 3, 4);
558 $q->insert(-2, qw/foo bar/);
559 # Queue now contains: 1, 2, foo, bar, 3, 4
560
561Specifying a negative index position greater than the number of items in the
562queue adds the list to the head of the queue.
563
564=item ->extract()
565
566=item ->extract(INDEX)
567
568=item ->extract(INDEX, COUNT)
569
570Removes and returns the specified number of items (defaults to 1) from the
571specified index position in the queue (0 is the head of the queue). When
572called with no arguments, C<extract> operates the same as C<dequeue_nb>.
573
574This method is non-blocking, and will return only as many items as are
575available to fulfill the request:
576
577 $q->enqueue(1, 2, 3, 4);
578 my $item = $q->extract(2) # Returns 3
579 # Queue now contains: 1, 2, 4
580 my @items = $q->extract(1, 3) # Returns (2, 4)
581 # Queue now contains: 1
582
583Specifying an index position greater than the number of items in the
584queue results in C<undef> or an empty list being returned.
585
586 $q->enqueue('foo');
587 my $nada = $q->extract(3) # Returns undef
588 my @nada = $q->extract(1, 3) # Returns ()
589
590Negative index positions are supported. Specifying a negative index position
591greater than the number of items in the queue may return items from the head
592of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
593queue from the specified position (i.e. if queue size + index + count is
594greater than zero):
595
596 $q->enqueue(qw/foo bar baz/);
597 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
598 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
599 # Queue now contains: bar, baz
600 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
601
602=back
603
7fb1c73b
JH
604=head1 NOTES
605
606Queues created by L<Thread::Queue> can be used in both threaded and
607non-threaded applications.
608
54c7876f
JH
609=head1 LIMITATIONS
610
611Passing objects on queues may not work if the objects' classes do not support
612sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
613
614Passing array/hash refs that contain objects may not work for Perl prior to
6155.10.0.
616
617=head1 SEE ALSO
618
619Thread::Queue Discussion Forum on CPAN:
620L<http://www.cpanforum.com/dist/Thread-Queue>
621
54c7876f
JH
622L<threads>, L<threads::shared>
623
1fd4700e
JH
624Sample code in the I<examples> directory of this distribution on CPAN.
625
54c7876f
JH
626=head1 MAINTAINER
627
628Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
629
630=head1 LICENSE
631
632This program is free software; you can redistribute it and/or modify it under
633the same terms as Perl itself.
634
635=cut