This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
svleak.t: Fix a mad failure
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '2.12';
7 $VERSION = eval $VERSION;
8
9 use threads::shared 1.21;
10 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12 # Carp errors from threads::shared calls should complain about caller
13 our @CARP_NOT = ("threads::shared");
14
15 # Predeclarations for internal functions
16 my ($validate_count, $validate_index);
17
18 # Create a new queue possibly pre-populated with items
19 sub new
20 {
21     my $class = shift;
22     my @queue :shared = map { shared_clone($_) } @_;
23     return bless(\@queue, $class);
24 }
25
26 # Add items to the tail of a queue
27 sub enqueue
28 {
29     my $queue = shift;
30     lock(@$queue);
31     push(@$queue, map { shared_clone($_) } @_)
32         and cond_signal(@$queue);
33 }
34
35 # Return a count of the number of items on a queue
36 sub pending
37 {
38     my $queue = shift;
39     lock(@$queue);
40     return scalar(@$queue);
41 }
42
43 # Return 1 or more items from the head of a queue, blocking if needed
44 sub dequeue
45 {
46     my $queue = shift;
47     lock(@$queue);
48
49     my $count = @_ ? $validate_count->(shift) : 1;
50
51     # Wait for requisite number of items
52     cond_wait(@$queue) until (@$queue >= $count);
53     cond_signal(@$queue) if (@$queue > $count);
54
55     # Return single item
56     return shift(@$queue) if ($count == 1);
57
58     # Return multiple items
59     my @items;
60     push(@items, shift(@$queue)) for (1..$count);
61     return @items;
62 }
63
64 # Return items from the head of a queue with no blocking
65 sub dequeue_nb
66 {
67     my $queue = shift;
68     lock(@$queue);
69
70     my $count = @_ ? $validate_count->(shift) : 1;
71
72     # Return single item
73     return shift(@$queue) if ($count == 1);
74
75     # Return multiple items
76     my @items;
77     for (1..$count) {
78         last if (! @$queue);
79         push(@items, shift(@$queue));
80     }
81     return @items;
82 }
83
84 # Return an item without removing it from a queue
85 sub peek
86 {
87     my $queue = shift;
88     lock(@$queue);
89     my $index = @_ ? $validate_index->(shift) : 0;
90     return $$queue[$index];
91 }
92
93 # Insert items anywhere into a queue
94 sub insert
95 {
96     my $queue = shift;
97     lock(@$queue);
98
99     my $index = $validate_index->(shift);
100
101     return if (! @_);   # Nothing to insert
102
103     # Support negative indices
104     if ($index < 0) {
105         $index += @$queue;
106         if ($index < 0) {
107             $index = 0;
108         }
109     }
110
111     # Dequeue items from $index onward
112     my @tmp;
113     while (@$queue > $index) {
114         unshift(@tmp, pop(@$queue))
115     }
116
117     # Add new items to the queue
118     push(@$queue, map { shared_clone($_) } @_);
119
120     # Add previous items back onto the queue
121     push(@$queue, @tmp);
122
123     # Soup's up
124     cond_signal(@$queue);
125 }
126
127 # Remove items from anywhere in a queue
128 sub extract
129 {
130     my $queue = shift;
131     lock(@$queue);
132
133     my $index = @_ ? $validate_index->(shift) : 0;
134     my $count = @_ ? $validate_count->(shift) : 1;
135
136     # Support negative indices
137     if ($index < 0) {
138         $index += @$queue;
139         if ($index < 0) {
140             $count += $index;
141             return if ($count <= 0);            # Beyond the head of the queue
142             return $queue->dequeue_nb($count);  # Extract from the head
143         }
144     }
145
146     # Dequeue items from $index+$count onward
147     my @tmp;
148     while (@$queue > ($index+$count)) {
149         unshift(@tmp, pop(@$queue))
150     }
151
152     # Extract desired items
153     my @items;
154     unshift(@items, pop(@$queue)) while (@$queue > $index);
155
156     # Add back any removed items
157     push(@$queue, @tmp);
158
159     # Return single item
160     return $items[0] if ($count == 1);
161
162     # Return multiple items
163     return @items;
164 }
165
166 ### Internal Functions ###
167
168 # Check value of the requested index
169 $validate_index = sub {
170     my $index = shift;
171
172     if (! defined($index) ||
173         ! looks_like_number($index) ||
174         (int($index) != $index))
175     {
176         require Carp;
177         my ($method) = (caller(1))[3];
178         $method =~ s/Thread::Queue:://;
179         $index = 'undef' if (! defined($index));
180         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
181     }
182
183     return $index;
184 };
185
186 # Check value of the requested count
187 $validate_count = sub {
188     my $count = shift;
189
190     if (! defined($count) ||
191         ! looks_like_number($count) ||
192         (int($count) != $count) ||
193         ($count < 1))
194     {
195         require Carp;
196         my ($method) = (caller(1))[3];
197         $method =~ s/Thread::Queue:://;
198         $count = 'undef' if (! defined($count));
199         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
200     }
201
202     return $count;
203 };
204
205 1;
206
207 =head1 NAME
208
209 Thread::Queue - Thread-safe queues
210
211 =head1 VERSION
212
213 This document describes Thread::Queue version 2.12
214
215 =head1 SYNOPSIS
216
217     use strict;
218     use warnings;
219
220     use threads;
221     use Thread::Queue;
222
223     my $q = Thread::Queue->new();    # A new empty queue
224
225     # Worker thread
226     my $thr = threads->create(sub {
227                                 while (my $item = $q->dequeue()) {
228                                     # Do work on $item
229                                 }
230                              })->detach();
231
232     # Send work to the thread
233     $q->enqueue($item1, ...);
234
235
236     # Count of items in the queue
237     my $left = $q->pending();
238
239     # Non-blocking dequeue
240     if (defined(my $item = $q->dequeue_nb())) {
241         # Work on $item
242     }
243
244     # Get the second item in the queue without dequeuing anything
245     my $item = $q->peek(1);
246
247     # Insert two items into the queue just behind the head
248     $q->insert(1, $item1, $item2);
249
250     # Extract the last two items on the queue
251     my ($item1, $item2) = $q->extract(-2, 2);
252
253 =head1 DESCRIPTION
254
255 This module provides thread-safe FIFO queues that can be accessed safely by
256 any number of threads.
257
258 Any data types supported by L<threads::shared> can be passed via queues:
259
260 =over
261
262 =item Ordinary scalars
263
264 =item Array refs
265
266 =item Hash refs
267
268 =item Scalar refs
269
270 =item Objects based on the above
271
272 =back
273
274 Ordinary scalars are added to queues as they are.
275
276 If not already thread-shared, the other complex data types will be cloned
277 (recursively, if needed, and including any C<bless>ings and read-only
278 settings) into thread-shared structures before being placed onto a queue.
279
280 For example, the following would cause L<Thread::Queue> to create a empty,
281 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
282 and 'baz' from C<@ary> into it, and then place that shared reference onto
283 the queue:
284
285     my @ary = qw/foo bar baz/;
286     $q->enqueue(\@ary);
287
288 However, for the following, the items are already shared, so their references
289 are added directly to the queue, and no cloning takes place:
290
291     my @ary :shared = qw/foo bar baz/;
292     $q->enqueue(\@ary);
293
294     my $obj = &shared({});
295     $$obj{'foo'} = 'bar';
296     $$obj{'qux'} = 99;
297     bless($obj, 'My::Class');
298     $q->enqueue($obj);
299
300 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
301
302 =head1 QUEUE CREATION
303
304 =over
305
306 =item ->new()
307
308 Creates a new empty queue.
309
310 =item ->new(LIST)
311
312 Creates a new queue pre-populated with the provided list of items.
313
314 =back
315
316 =head1 BASIC METHODS
317
318 The following methods deal with queues on a FIFO basis.
319
320 =over
321
322 =item ->enqueue(LIST)
323
324 Adds a list of items onto the end of the queue.
325
326 =item ->dequeue()
327
328 =item ->dequeue(COUNT)
329
330 Removes the requested number of items (default is 1) from the head of the
331 queue, and returns them.  If the queue contains fewer than the requested
332 number of items, then the thread will be blocked until the requisite number
333 of items are available (i.e., until other threads <enqueue> more items).
334
335 =item ->dequeue_nb()
336
337 =item ->dequeue_nb(COUNT)
338
339 Removes the requested number of items (default is 1) from the head of the
340 queue, and returns them.  If the queue contains fewer than the requested
341 number of items, then it immediately (i.e., non-blocking) returns whatever
342 items there are on the queue.  If the queue is empty, then C<undef> is
343 returned.
344
345 =item ->pending()
346
347 Returns the number of items still in the queue.
348
349 =back
350
351 =head1 ADVANCED METHODS
352
353 The following methods can be used to manipulate items anywhere in a queue.
354
355 To prevent the contents of a queue from being modified by another thread
356 while it is being examined and/or changed, L<lock|threads::shared/"lock
357 VARIABLE"> the queue inside a local block:
358
359     {
360         lock($q);   # Keep other threads from changing the queue's contents
361         my $item = $q->peek();
362         if ($item ...) {
363             ...
364         }
365     }
366     # Queue is now unlocked
367
368 =over
369
370 =item ->peek()
371
372 =item ->peek(INDEX)
373
374 Returns an item from the queue without dequeuing anything.  Defaults to the
375 the head of queue (at index position 0) if no index is specified.  Negative
376 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
377 is the end of the queue, -2 is next to last, and so on).
378
379 If no items exists at the specified index (i.e., the queue is empty, or the
380 index is beyond the number of items on the queue), then C<undef> is returned.
381
382 Remember, the returned item is not removed from the queue, so manipulating a
383 C<peek>ed at reference affects the item on the queue.
384
385 =item ->insert(INDEX, LIST)
386
387 Adds the list of items to the queue at the specified index position (0
388 is the head of the list).  Any existing items at and beyond that position are
389 pushed back past the newly added items:
390
391     $q->enqueue(1, 2, 3, 4);
392     $q->insert(1, qw/foo bar/);
393     # Queue now contains:  1, foo, bar, 2, 3, 4
394
395 Specifying an index position greater than the number of items in the queue
396 just adds the list to the end.
397
398 Negative index positions are supported:
399
400     $q->enqueue(1, 2, 3, 4);
401     $q->insert(-2, qw/foo bar/);
402     # Queue now contains:  1, 2, foo, bar, 3, 4
403
404 Specifying a negative index position greater than the number of items in the
405 queue adds the list to the head of the queue.
406
407 =item ->extract()
408
409 =item ->extract(INDEX)
410
411 =item ->extract(INDEX, COUNT)
412
413 Removes and returns the specified number of items (defaults to 1) from the
414 specified index position in the queue (0 is the head of the queue).  When
415 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
416
417 This method is non-blocking, and will return only as many items as are
418 available to fulfill the request:
419
420     $q->enqueue(1, 2, 3, 4);
421     my $item  = $q->extract(2)     # Returns 3
422                                    # Queue now contains:  1, 2, 4
423     my @items = $q->extract(1, 3)  # Returns (2, 4)
424                                    # Queue now contains:  1
425
426 Specifying an index position greater than the number of items in the
427 queue results in C<undef> or an empty list being returned.
428
429     $q->enqueue('foo');
430     my $nada = $q->extract(3)      # Returns undef
431     my @nada = $q->extract(1, 3)   # Returns ()
432
433 Negative index positions are supported.  Specifying a negative index position
434 greater than the number of items in the queue may return items from the head
435 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
436 queue from the specified position (i.e. if queue size + index + count is
437 greater than zero):
438
439     $q->enqueue(qw/foo bar baz/);
440     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
441     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
442                                      # Queue now contains:  bar, baz
443     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
444
445 =back
446
447 =head1 NOTES
448
449 Queues created by L<Thread::Queue> can be used in both threaded and
450 non-threaded applications.
451
452 =head1 LIMITATIONS
453
454 Passing objects on queues may not work if the objects' classes do not support
455 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
456
457 Passing array/hash refs that contain objects may not work for Perl prior to
458 5.10.0.
459
460 =head1 SEE ALSO
461
462 Thread::Queue Discussion Forum on CPAN:
463 L<http://www.cpanforum.com/dist/Thread-Queue>
464
465 L<threads>, L<threads::shared>
466
467 =head1 MAINTAINER
468
469 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
470
471 =head1 LICENSE
472
473 This program is free software; you can redistribute it and/or modify it under
474 the same terms as Perl itself.
475
476 =cut