This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
316644a64f361c7f9ae46da1e6d1cbba22041dfa
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '3.05';
7 $VERSION = eval $VERSION;
8
9 use threads::shared 1.21;
10 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12 # Carp errors from threads::shared calls should complain about caller
13 our @CARP_NOT = ("threads::shared");
14
15 # Create a new queue possibly pre-populated with items
16 sub new
17 {
18     my $class = shift;
19     my @queue :shared = map { shared_clone($_) } @_;
20     my %self :shared = ( 'queue' => \@queue );
21     return bless(\%self, $class);
22 }
23
24 # Add items to the tail of a queue
25 sub enqueue
26 {
27     my $self = shift;
28     lock(%$self);
29     if ($$self{'ENDED'}) {
30         require Carp;
31         Carp::croak("'enqueue' method called on queue that has been 'end'ed");
32     }
33     push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
34         and cond_signal(%$self);
35 }
36
37 # Return a count of the number of items on a queue
38 sub pending
39 {
40     my $self = shift;
41     lock(%$self);
42     return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
43     return scalar(@{$$self{'queue'}});
44 }
45
46 # Indicate that no more data will enter the queue
47 sub end
48 {
49     my $self = shift;
50     lock $self;
51     # No more data is coming
52     $$self{'ENDED'} = 1;
53     # Try to release at least one blocked thread
54     cond_signal(%$self);
55 }
56
57 # Return 1 or more items from the head of a queue, blocking if needed
58 sub dequeue
59 {
60     my $self = shift;
61     lock(%$self);
62     my $queue = $$self{'queue'};
63
64     my $count = @_ ? $self->_validate_count(shift) : 1;
65
66     # Wait for requisite number of items
67     cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
68     cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
69
70     # If no longer blocking, try getting whatever is left on the queue
71     return $self->dequeue_nb($count) if ($$self{'ENDED'});
72
73     # Return single item
74     return shift(@$queue) if ($count == 1);
75
76     # Return multiple items
77     my @items;
78     push(@items, shift(@$queue)) for (1..$count);
79     return @items;
80 }
81
82 # Return items from the head of a queue with no blocking
83 sub dequeue_nb
84 {
85     my $self = shift;
86     lock(%$self);
87     my $queue = $$self{'queue'};
88
89     my $count = @_ ? $self->_validate_count(shift) : 1;
90
91     # Return single item
92     return shift(@$queue) if ($count == 1);
93
94     # Return multiple items
95     my @items;
96     for (1..$count) {
97         last if (! @$queue);
98         push(@items, shift(@$queue));
99     }
100     return @items;
101 }
102
103 # Return items from the head of a queue, blocking if needed up to a timeout
104 sub dequeue_timed
105 {
106     my $self = shift;
107     lock(%$self);
108     my $queue = $$self{'queue'};
109
110     # Timeout may be relative or absolute
111     my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
112     # Convert to an absolute time for use with cond_timedwait()
113     if ($timeout < 32000000) {   # More than one year
114         $timeout += time();
115     }
116
117     my $count = @_ ? $self->_validate_count(shift) : 1;
118
119     # Wait for requisite number of items, or until timeout
120     while ((@$queue < $count) && ! $$self{'ENDED'}) {
121         last if (! cond_timedwait(%$self, $timeout));
122     }
123     cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
124
125     # Get whatever we need off the queue if available
126     return $self->dequeue_nb($count);
127 }
128
129 # Return an item without removing it from a queue
130 sub peek
131 {
132     my $self = shift;
133     lock(%$self);
134     my $index = @_ ? $self->_validate_index(shift) : 0;
135     return $$self{'queue'}[$index];
136 }
137
138 # Insert items anywhere into a queue
139 sub insert
140 {
141     my $self = shift;
142     lock(%$self);
143
144     if ($$self{'ENDED'}) {
145         require Carp;
146         Carp::croak("'insert' method called on queue that has been 'end'ed");
147     }
148
149     my $queue = $$self{'queue'};
150
151     my $index = $self->_validate_index(shift);
152
153     return if (! @_);   # Nothing to insert
154
155     # Support negative indices
156     if ($index < 0) {
157         $index += @$queue;
158         if ($index < 0) {
159             $index = 0;
160         }
161     }
162
163     # Dequeue items from $index onward
164     my @tmp;
165     while (@$queue > $index) {
166         unshift(@tmp, pop(@$queue))
167     }
168
169     # Add new items to the queue
170     push(@$queue, map { shared_clone($_) } @_);
171
172     # Add previous items back onto the queue
173     push(@$queue, @tmp);
174
175     # Soup's up
176     cond_signal(%$self);
177 }
178
179 # Remove items from anywhere in a queue
180 sub extract
181 {
182     my $self = shift;
183     lock(%$self);
184     my $queue = $$self{'queue'};
185
186     my $index = @_ ? $self->_validate_index(shift) : 0;
187     my $count = @_ ? $self->_validate_count(shift) : 1;
188
189     # Support negative indices
190     if ($index < 0) {
191         $index += @$queue;
192         if ($index < 0) {
193             $count += $index;
194             return if ($count <= 0);            # Beyond the head of the queue
195             return $self->dequeue_nb($count);  # Extract from the head
196         }
197     }
198
199     # Dequeue items from $index+$count onward
200     my @tmp;
201     while (@$queue > ($index+$count)) {
202         unshift(@tmp, pop(@$queue))
203     }
204
205     # Extract desired items
206     my @items;
207     unshift(@items, pop(@$queue)) while (@$queue > $index);
208
209     # Add back any removed items
210     push(@$queue, @tmp);
211
212     # Return single item
213     return $items[0] if ($count == 1);
214
215     # Return multiple items
216     return @items;
217 }
218
219 ### Internal Methods ###
220
221 # Check value of the requested index
222 sub _validate_index
223 {
224     my $self = shift;
225     my $index = shift;
226
227     if (! defined($index) ||
228         ! looks_like_number($index) ||
229         (int($index) != $index))
230     {
231         require Carp;
232         my ($method) = (caller(1))[3];
233         my $class_name = ref($self);
234         $method =~ s/$class_name\:://;
235         $index = 'undef' if (! defined($index));
236         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
237     }
238
239     return $index;
240 };
241
242 # Check value of the requested count
243 sub _validate_count
244 {
245     my $self = shift;
246     my $count = shift;
247
248     if (! defined($count) ||
249         ! looks_like_number($count) ||
250         (int($count) != $count) ||
251         ($count < 1))
252     {
253         require Carp;
254         my ($method) = (caller(1))[3];
255         my $class_name = ref($self);
256         $method =~ s/$class_name\:://;
257         $count = 'undef' if (! defined($count));
258         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
259     }
260
261     return $count;
262 };
263
264 # Check value of the requested timeout
265 sub _validate_timeout
266 {
267     my $self = shift;
268     my $timeout = shift;
269
270     if (! defined($timeout) ||
271         ! looks_like_number($timeout))
272     {
273         require Carp;
274         my ($method) = (caller(1))[3];
275         my $class_name = ref($self);
276         $method =~ s/$class_name\:://;
277         $timeout = 'undef' if (! defined($timeout));
278         Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
279     }
280
281     return $timeout;
282 };
283
284 1;
285
286 =head1 NAME
287
288 Thread::Queue - Thread-safe queues
289
290 =head1 VERSION
291
292 This document describes Thread::Queue version 3.05
293
294 =head1 SYNOPSIS
295
296     use strict;
297     use warnings;
298
299     use threads;
300     use Thread::Queue;
301
302     my $q = Thread::Queue->new();    # A new empty queue
303
304     # Worker thread
305     my $thr = threads->create(
306         sub {
307             # Thread will loop until no more work
308             while (defined(my $item = $q->dequeue())) {
309                 # Do work on $item
310                 ...
311             }
312         }
313     );
314
315     # Send work to the thread
316     $q->enqueue($item1, ...);
317     # Signal that there is no more work to be sent
318     $q->end();
319     # Join up with the thread when it finishes
320     $thr->join();
321
322     ...
323
324     # Count of items in the queue
325     my $left = $q->pending();
326
327     # Non-blocking dequeue
328     if (defined(my $item = $q->dequeue_nb())) {
329         # Work on $item
330     }
331
332     # Blocking dequeue with 5-second timeout
333     if (defined(my $item = $q->dequeue_timed(5))) {
334         # Work on $item
335     }
336
337     # Get the second item in the queue without dequeuing anything
338     my $item = $q->peek(1);
339
340     # Insert two items into the queue just behind the head
341     $q->insert(1, $item1, $item2);
342
343     # Extract the last two items on the queue
344     my ($item1, $item2) = $q->extract(-2, 2);
345
346 =head1 DESCRIPTION
347
348 This module provides thread-safe FIFO queues that can be accessed safely by
349 any number of threads.
350
351 Any data types supported by L<threads::shared> can be passed via queues:
352
353 =over
354
355 =item Ordinary scalars
356
357 =item Array refs
358
359 =item Hash refs
360
361 =item Scalar refs
362
363 =item Objects based on the above
364
365 =back
366
367 Ordinary scalars are added to queues as they are.
368
369 If not already thread-shared, the other complex data types will be cloned
370 (recursively, if needed, and including any C<bless>ings and read-only
371 settings) into thread-shared structures before being placed onto a queue.
372
373 For example, the following would cause L<Thread::Queue> to create a empty,
374 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
375 and 'baz' from C<@ary> into it, and then place that shared reference onto
376 the queue:
377
378     my @ary = qw/foo bar baz/;
379     $q->enqueue(\@ary);
380
381 However, for the following, the items are already shared, so their references
382 are added directly to the queue, and no cloning takes place:
383
384     my @ary :shared = qw/foo bar baz/;
385     $q->enqueue(\@ary);
386
387     my $obj = &shared({});
388     $$obj{'foo'} = 'bar';
389     $$obj{'qux'} = 99;
390     bless($obj, 'My::Class');
391     $q->enqueue($obj);
392
393 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
394
395 =head1 QUEUE CREATION
396
397 =over
398
399 =item ->new()
400
401 Creates a new empty queue.
402
403 =item ->new(LIST)
404
405 Creates a new queue pre-populated with the provided list of items.
406
407 =back
408
409 =head1 BASIC METHODS
410
411 The following methods deal with queues on a FIFO basis.
412
413 =over
414
415 =item ->enqueue(LIST)
416
417 Adds a list of items onto the end of the queue.
418
419 =item ->dequeue()
420
421 =item ->dequeue(COUNT)
422
423 Removes the requested number of items (default is 1) from the head of the
424 queue, and returns them.  If the queue contains fewer than the requested
425 number of items, then the thread will be blocked until the requisite number
426 of items are available (i.e., until other threads <enqueue> more items).
427
428 =item ->dequeue_nb()
429
430 =item ->dequeue_nb(COUNT)
431
432 Removes the requested number of items (default is 1) from the head of the
433 queue, and returns them.  If the queue contains fewer than the requested
434 number of items, then it immediately (i.e., non-blocking) returns whatever
435 items there are on the queue.  If the queue is empty, then C<undef> is
436 returned.
437
438 =item ->dequeue_timed(TIMEOUT)
439
440 =item ->dequeue_timed(TIMEOUT, COUNT)
441
442 Removes the requested number of items (default is 1) from the head of the
443 queue, and returns them.  If the queue contains fewer than the requested
444 number of items, then the thread will be blocked until the requisite number of
445 items are available, or until the timeout is reached.  If the timeout is
446 reached, it returns whatever items there are on the queue, or C<undef> if the
447 queue is empty.
448
449 The timeout may be a number of seconds relative to the current time (e.g., 5
450 seconds from when the call is made), or may be an absolute timeout in I<epoch>
451 seconds the same as would be used with
452 L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
453 Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
454 the underlying implementation).
455
456 If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
457 behaves the same as C<dequeue_nb>.
458
459 =item ->pending()
460
461 Returns the number of items still in the queue.  Returns C<undef> if the queue
462 has been ended (see below), and there are no more items in the queue.
463
464 =item ->end()
465
466 Declares that no more items will be added to the queue.
467
468 All threads blocking on C<dequeue()> calls will be unblocked with any
469 remaining items in the queue and/or C<undef> being returned.  Any subsequent
470 calls to C<dequeue()> will behave like C<dequeue_nb()>.
471
472 Once ended, no more items may be placed in the queue.
473
474 =back
475
476 =head1 ADVANCED METHODS
477
478 The following methods can be used to manipulate items anywhere in a queue.
479
480 To prevent the contents of a queue from being modified by another thread
481 while it is being examined and/or changed, L<lock|threads::shared/"lock
482 VARIABLE"> the queue inside a local block:
483
484     {
485         lock($q);   # Keep other threads from changing the queue's contents
486         my $item = $q->peek();
487         if ($item ...) {
488             ...
489         }
490     }
491     # Queue is now unlocked
492
493 =over
494
495 =item ->peek()
496
497 =item ->peek(INDEX)
498
499 Returns an item from the queue without dequeuing anything.  Defaults to the
500 the head of queue (at index position 0) if no index is specified.  Negative
501 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
502 is the end of the queue, -2 is next to last, and so on).
503
504 If no items exists at the specified index (i.e., the queue is empty, or the
505 index is beyond the number of items on the queue), then C<undef> is returned.
506
507 Remember, the returned item is not removed from the queue, so manipulating a
508 C<peek>ed at reference affects the item on the queue.
509
510 =item ->insert(INDEX, LIST)
511
512 Adds the list of items to the queue at the specified index position (0
513 is the head of the list).  Any existing items at and beyond that position are
514 pushed back past the newly added items:
515
516     $q->enqueue(1, 2, 3, 4);
517     $q->insert(1, qw/foo bar/);
518     # Queue now contains:  1, foo, bar, 2, 3, 4
519
520 Specifying an index position greater than the number of items in the queue
521 just adds the list to the end.
522
523 Negative index positions are supported:
524
525     $q->enqueue(1, 2, 3, 4);
526     $q->insert(-2, qw/foo bar/);
527     # Queue now contains:  1, 2, foo, bar, 3, 4
528
529 Specifying a negative index position greater than the number of items in the
530 queue adds the list to the head of the queue.
531
532 =item ->extract()
533
534 =item ->extract(INDEX)
535
536 =item ->extract(INDEX, COUNT)
537
538 Removes and returns the specified number of items (defaults to 1) from the
539 specified index position in the queue (0 is the head of the queue).  When
540 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
541
542 This method is non-blocking, and will return only as many items as are
543 available to fulfill the request:
544
545     $q->enqueue(1, 2, 3, 4);
546     my $item  = $q->extract(2)     # Returns 3
547                                    # Queue now contains:  1, 2, 4
548     my @items = $q->extract(1, 3)  # Returns (2, 4)
549                                    # Queue now contains:  1
550
551 Specifying an index position greater than the number of items in the
552 queue results in C<undef> or an empty list being returned.
553
554     $q->enqueue('foo');
555     my $nada = $q->extract(3)      # Returns undef
556     my @nada = $q->extract(1, 3)   # Returns ()
557
558 Negative index positions are supported.  Specifying a negative index position
559 greater than the number of items in the queue may return items from the head
560 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
561 queue from the specified position (i.e. if queue size + index + count is
562 greater than zero):
563
564     $q->enqueue(qw/foo bar baz/);
565     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
566     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
567                                      # Queue now contains:  bar, baz
568     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
569
570 =back
571
572 =head1 NOTES
573
574 Queues created by L<Thread::Queue> can be used in both threaded and
575 non-threaded applications.
576
577 =head1 LIMITATIONS
578
579 Passing objects on queues may not work if the objects' classes do not support
580 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
581
582 Passing array/hash refs that contain objects may not work for Perl prior to
583 5.10.0.
584
585 =head1 SEE ALSO
586
587 Thread::Queue Discussion Forum on CPAN:
588 L<http://www.cpanforum.com/dist/Thread-Queue>
589
590 L<threads>, L<threads::shared>
591
592 Sample code in the I<examples> directory of this distribution on CPAN.
593
594 =head1 MAINTAINER
595
596 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
597
598 =head1 LICENSE
599
600 This program is free software; you can redistribute it and/or modify it under
601 the same terms as Perl itself.
602
603 =cut