This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
svleak.t: Fix a mad failure
[perl5.git] / dist / Thread-Queue / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
23e2fda9
JH
6our $VERSION = '2.12';
7$VERSION = eval $VERSION;
54c7876f 8
09782346 9use threads::shared 1.21;
ac9d3a9d 10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
54c7876f 11
09782346
JH
12# Carp errors from threads::shared calls should complain about caller
13our @CARP_NOT = ("threads::shared");
14
54c7876f 15# Predeclarations for internal functions
09782346 16my ($validate_count, $validate_index);
54c7876f
JH
17
18# Create a new queue possibly pre-populated with items
19sub new
20{
21 my $class = shift;
09782346 22 my @queue :shared = map { shared_clone($_) } @_;
54c7876f
JH
23 return bless(\@queue, $class);
24}
25
26# Add items to the tail of a queue
27sub enqueue
28{
29 my $queue = shift;
30 lock(@$queue);
09782346 31 push(@$queue, map { shared_clone($_) } @_)
54c7876f
JH
32 and cond_signal(@$queue);
33}
34
35# Return a count of the number of items on a queue
36sub pending
37{
38 my $queue = shift;
39 lock(@$queue);
40 return scalar(@$queue);
41}
42
43# Return 1 or more items from the head of a queue, blocking if needed
44sub dequeue
45{
46 my $queue = shift;
47 lock(@$queue);
48
49 my $count = @_ ? $validate_count->(shift) : 1;
50
51 # Wait for requisite number of items
52 cond_wait(@$queue) until (@$queue >= $count);
53 cond_signal(@$queue) if (@$queue > $count);
54
55 # Return single item
56 return shift(@$queue) if ($count == 1);
57
58 # Return multiple items
59 my @items;
60 push(@items, shift(@$queue)) for (1..$count);
61 return @items;
62}
63
64# Return items from the head of a queue with no blocking
65sub dequeue_nb
66{
67 my $queue = shift;
68 lock(@$queue);
69
70 my $count = @_ ? $validate_count->(shift) : 1;
71
72 # Return single item
73 return shift(@$queue) if ($count == 1);
74
75 # Return multiple items
76 my @items;
77 for (1..$count) {
78 last if (! @$queue);
79 push(@items, shift(@$queue));
80 }
81 return @items;
82}
83
84# Return an item without removing it from a queue
85sub peek
86{
87 my $queue = shift;
88 lock(@$queue);
89 my $index = @_ ? $validate_index->(shift) : 0;
90 return $$queue[$index];
91}
92
93# Insert items anywhere into a queue
94sub insert
95{
96 my $queue = shift;
97 lock(@$queue);
98
99 my $index = $validate_index->(shift);
100
101 return if (! @_); # Nothing to insert
102
103 # Support negative indices
104 if ($index < 0) {
105 $index += @$queue;
106 if ($index < 0) {
107 $index = 0;
108 }
109 }
110
111 # Dequeue items from $index onward
112 my @tmp;
113 while (@$queue > $index) {
114 unshift(@tmp, pop(@$queue))
115 }
116
117 # Add new items to the queue
09782346 118 push(@$queue, map { shared_clone($_) } @_);
54c7876f
JH
119
120 # Add previous items back onto the queue
121 push(@$queue, @tmp);
122
123 # Soup's up
124 cond_signal(@$queue);
125}
126
127# Remove items from anywhere in a queue
128sub extract
129{
130 my $queue = shift;
131 lock(@$queue);
132
133 my $index = @_ ? $validate_index->(shift) : 0;
134 my $count = @_ ? $validate_count->(shift) : 1;
135
136 # Support negative indices
137 if ($index < 0) {
138 $index += @$queue;
139 if ($index < 0) {
140 $count += $index;
141 return if ($count <= 0); # Beyond the head of the queue
142 return $queue->dequeue_nb($count); # Extract from the head
143 }
144 }
145
146 # Dequeue items from $index+$count onward
147 my @tmp;
148 while (@$queue > ($index+$count)) {
149 unshift(@tmp, pop(@$queue))
150 }
151
152 # Extract desired items
153 my @items;
154 unshift(@items, pop(@$queue)) while (@$queue > $index);
155
156 # Add back any removed items
157 push(@$queue, @tmp);
158
159 # Return single item
160 return $items[0] if ($count == 1);
161
162 # Return multiple items
163 return @items;
164}
165
166### Internal Functions ###
167
54c7876f
JH
168# Check value of the requested index
169$validate_index = sub {
170 my $index = shift;
171
e336069e
JH
172 if (! defined($index) ||
173 ! looks_like_number($index) ||
174 (int($index) != $index))
175 {
54c7876f
JH
176 require Carp;
177 my ($method) = (caller(1))[3];
178 $method =~ s/Thread::Queue:://;
179 $index = 'undef' if (! defined($index));
180 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
181 }
182
183 return $index;
184};
185
186# Check value of the requested count
187$validate_count = sub {
188 my $count = shift;
189
e336069e
JH
190 if (! defined($count) ||
191 ! looks_like_number($count) ||
192 (int($count) != $count) ||
193 ($count < 1))
194 {
54c7876f
JH
195 require Carp;
196 my ($method) = (caller(1))[3];
197 $method =~ s/Thread::Queue:://;
198 $count = 'undef' if (! defined($count));
199 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
200 }
201
202 return $count;
203};
204
2051;
9c6f8578 206
d516a115
JH
207=head1 NAME
208
54c7876f
JH
209Thread::Queue - Thread-safe queues
210
211=head1 VERSION
212
23e2fda9 213This document describes Thread::Queue version 2.12
d516a115
JH
214
215=head1 SYNOPSIS
216
54c7876f
JH
217 use strict;
218 use warnings;
219
220 use threads;
d516a115 221 use Thread::Queue;
54c7876f
JH
222
223 my $q = Thread::Queue->new(); # A new empty queue
224
225 # Worker thread
226 my $thr = threads->create(sub {
227 while (my $item = $q->dequeue()) {
228 # Do work on $item
229 }
230 })->detach();
231
232 # Send work to the thread
233 $q->enqueue($item1, ...);
234
235
236 # Count of items in the queue
237 my $left = $q->pending();
238
239 # Non-blocking dequeue
240 if (defined(my $item = $q->dequeue_nb())) {
241 # Work on $item
242 }
243
244 # Get the second item in the queue without dequeuing anything
245 my $item = $q->peek(1);
246
247 # Insert two items into the queue just behind the head
248 $q->insert(1, $item1, $item2);
249
250 # Extract the last two items on the queue
251 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 252
a99072da
HM
253=head1 DESCRIPTION
254
54c7876f
JH
255This module provides thread-safe FIFO queues that can be accessed safely by
256any number of threads.
257
258Any data types supported by L<threads::shared> can be passed via queues:
259
260=over
261
262=item Ordinary scalars
263
264=item Array refs
265
266=item Hash refs
267
268=item Scalar refs
269
270=item Objects based on the above
271
272=back
273
274Ordinary scalars are added to queues as they are.
275
276If not already thread-shared, the other complex data types will be cloned
277(recursively, if needed, and including any C<bless>ings and read-only
278settings) into thread-shared structures before being placed onto a queue.
a99072da 279
54c7876f
JH
280For example, the following would cause L<Thread::Queue> to create a empty,
281shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
282and 'baz' from C<@ary> into it, and then place that shared reference onto
283the queue:
a99072da 284
54c7876f
JH
285 my @ary = qw/foo bar baz/;
286 $q->enqueue(\@ary);
a99072da 287
54c7876f
JH
288However, for the following, the items are already shared, so their references
289are added directly to the queue, and no cloning takes place:
a99072da 290
54c7876f
JH
291 my @ary :shared = qw/foo bar baz/;
292 $q->enqueue(\@ary);
a99072da 293
54c7876f
JH
294 my $obj = &shared({});
295 $$obj{'foo'} = 'bar';
296 $$obj{'qux'} = 99;
297 bless($obj, 'My::Class');
298 $q->enqueue($obj);
a99072da 299
54c7876f 300See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 301
54c7876f 302=head1 QUEUE CREATION
a99072da 303
54c7876f 304=over
a99072da 305
54c7876f 306=item ->new()
a99072da 307
54c7876f 308Creates a new empty queue.
a99072da 309
54c7876f 310=item ->new(LIST)
a99072da 311
54c7876f 312Creates a new queue pre-populated with the provided list of items.
a99072da
HM
313
314=back
315
54c7876f 316=head1 BASIC METHODS
a99072da 317
54c7876f 318The following methods deal with queues on a FIFO basis.
bbc7dcd2 319
54c7876f 320=over
d516a115 321
54c7876f 322=item ->enqueue(LIST)
d21067e0 323
54c7876f 324Adds a list of items onto the end of the queue.
d21067e0 325
54c7876f 326=item ->dequeue()
a99072da 327
54c7876f 328=item ->dequeue(COUNT)
d21067e0 329
54c7876f
JH
330Removes the requested number of items (default is 1) from the head of the
331queue, and returns them. If the queue contains fewer than the requested
332number of items, then the thread will be blocked until the requisite number
333of items are available (i.e., until other threads <enqueue> more items).
a99072da 334
54c7876f
JH
335=item ->dequeue_nb()
336
337=item ->dequeue_nb(COUNT)
338
339Removes the requested number of items (default is 1) from the head of the
340queue, and returns them. If the queue contains fewer than the requested
341number of items, then it immediately (i.e., non-blocking) returns whatever
342items there are on the queue. If the queue is empty, then C<undef> is
343returned.
344
345=item ->pending()
346
347Returns the number of items still in the queue.
348
349=back
350
351=head1 ADVANCED METHODS
352
353The following methods can be used to manipulate items anywhere in a queue.
354
355To prevent the contents of a queue from being modified by another thread
356while it is being examined and/or changed, L<lock|threads::shared/"lock
357VARIABLE"> the queue inside a local block:
358
359 {
360 lock($q); # Keep other threads from changing the queue's contents
361 my $item = $q->peek();
362 if ($item ...) {
363 ...
364 }
365 }
366 # Queue is now unlocked
367
368=over
369
370=item ->peek()
371
372=item ->peek(INDEX)
373
374Returns an item from the queue without dequeuing anything. Defaults to the
375the head of queue (at index position 0) if no index is specified. Negative
376index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
377is the end of the queue, -2 is next to last, and so on).
378
379If no items exists at the specified index (i.e., the queue is empty, or the
380index is beyond the number of items on the queue), then C<undef> is returned.
381
382Remember, the returned item is not removed from the queue, so manipulating a
383C<peek>ed at reference affects the item on the queue.
384
385=item ->insert(INDEX, LIST)
386
387Adds the list of items to the queue at the specified index position (0
388is the head of the list). Any existing items at and beyond that position are
389pushed back past the newly added items:
390
391 $q->enqueue(1, 2, 3, 4);
392 $q->insert(1, qw/foo bar/);
393 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 394
54c7876f
JH
395Specifying an index position greater than the number of items in the queue
396just adds the list to the end.
83272a45 397
54c7876f
JH
398Negative index positions are supported:
399
400 $q->enqueue(1, 2, 3, 4);
401 $q->insert(-2, qw/foo bar/);
402 # Queue now contains: 1, 2, foo, bar, 3, 4
403
404Specifying a negative index position greater than the number of items in the
405queue adds the list to the head of the queue.
406
407=item ->extract()
408
409=item ->extract(INDEX)
410
411=item ->extract(INDEX, COUNT)
412
413Removes and returns the specified number of items (defaults to 1) from the
414specified index position in the queue (0 is the head of the queue). When
415called with no arguments, C<extract> operates the same as C<dequeue_nb>.
416
417This method is non-blocking, and will return only as many items as are
418available to fulfill the request:
419
420 $q->enqueue(1, 2, 3, 4);
421 my $item = $q->extract(2) # Returns 3
422 # Queue now contains: 1, 2, 4
423 my @items = $q->extract(1, 3) # Returns (2, 4)
424 # Queue now contains: 1
425
426Specifying an index position greater than the number of items in the
427queue results in C<undef> or an empty list being returned.
428
429 $q->enqueue('foo');
430 my $nada = $q->extract(3) # Returns undef
431 my @nada = $q->extract(1, 3) # Returns ()
432
433Negative index positions are supported. Specifying a negative index position
434greater than the number of items in the queue may return items from the head
435of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
436queue from the specified position (i.e. if queue size + index + count is
437greater than zero):
438
439 $q->enqueue(qw/foo bar baz/);
440 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
441 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
442 # Queue now contains: bar, baz
443 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
444
445=back
446
7fb1c73b
JH
447=head1 NOTES
448
449Queues created by L<Thread::Queue> can be used in both threaded and
450non-threaded applications.
451
54c7876f
JH
452=head1 LIMITATIONS
453
454Passing objects on queues may not work if the objects' classes do not support
455sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
456
457Passing array/hash refs that contain objects may not work for Perl prior to
4585.10.0.
459
460=head1 SEE ALSO
461
462Thread::Queue Discussion Forum on CPAN:
463L<http://www.cpanforum.com/dist/Thread-Queue>
464
54c7876f
JH
465L<threads>, L<threads::shared>
466
467=head1 MAINTAINER
468
469Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
470
471=head1 LICENSE
472
473This program is free software; you can redistribute it and/or modify it under
474the same terms as Perl itself.
475
476=cut