| 1 | use strict; |
| 2 | use warnings; |
| 3 | |
| 4 | BEGIN { |
| 5 | use Config; |
| 6 | if (! $Config{'useithreads'}) { |
| 7 | print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); |
| 8 | exit(0); |
| 9 | } |
| 10 | } |
| 11 | |
| 12 | use threads; |
| 13 | use Thread::Queue; |
| 14 | |
| 15 | if ($] == 5.008) { |
| 16 | require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 |
| 17 | } else { |
| 18 | require Test::More; |
| 19 | } |
| 20 | Test::More->import(); |
| 21 | plan('tests' => 81); |
| 22 | |
| 23 | ### Basic usage with multiple threads ### |
| 24 | |
| 25 | my $nthreads = 5; |
| 26 | |
| 27 | my $q = Thread::Queue->new(1..$nthreads); |
| 28 | ok($q, 'New queue'); |
| 29 | is($q->pending(), $nthreads, 'Pre-populated queue count'); |
| 30 | |
| 31 | sub reader { |
| 32 | my $id = threads->tid(); |
| 33 | while ((my $el = $q->dequeue()) != -1) { |
| 34 | ok($el >= 1, "Thread $id got $el"); |
| 35 | select(undef, undef, undef, rand(1)); |
| 36 | } |
| 37 | ok(1, "Thread $id done"); |
| 38 | } |
| 39 | |
| 40 | my @threads; |
| 41 | push(@threads, threads->create('reader')) for (1..$nthreads); |
| 42 | |
| 43 | for (1..20) { |
| 44 | select(undef, undef, undef, rand(1)); |
| 45 | $q->enqueue($_); |
| 46 | } |
| 47 | |
| 48 | $q->enqueue((-1) x $nthreads); # One end marker for each thread |
| 49 | |
| 50 | $_->join() foreach @threads; |
| 51 | undef(@threads); |
| 52 | |
| 53 | is($q->pending(), 0, 'Empty queue'); |
| 54 | |
| 55 | |
| 56 | ### ->dequeue_nb() test ### |
| 57 | |
| 58 | $q = Thread::Queue->new(); |
| 59 | ok($q, 'New queue'); |
| 60 | is($q->pending(), 0, 'Empty queue'); |
| 61 | |
| 62 | my @items = qw/foo bar baz/; |
| 63 | $q->enqueue(@items); |
| 64 | |
| 65 | threads->create(sub { |
| 66 | is($q->pending(), scalar(@items), 'Queue count in thread'); |
| 67 | while (my $el = $q->dequeue_nb()) { |
| 68 | is($el, shift(@items), "Thread got $el"); |
| 69 | } |
| 70 | is($q->pending(), 0, 'Empty queue'); |
| 71 | $q->enqueue('done'); |
| 72 | })->join(); |
| 73 | |
| 74 | is($q->pending(), 1, 'Queue count after thread'); |
| 75 | is($q->dequeue(), 'done', 'Thread reported done'); |
| 76 | is($q->pending(), 0, 'Empty queue'); |
| 77 | |
| 78 | |
| 79 | ### ->dequeue(COUNT) test ### |
| 80 | |
| 81 | my $count = 3; |
| 82 | |
| 83 | sub reader2 { |
| 84 | my $id = threads->tid(); |
| 85 | while (1) { |
| 86 | my @el = $q->dequeue($count); |
| 87 | is(scalar(@el), $count, "Thread $id got @el"); |
| 88 | select(undef, undef, undef, rand(1)); |
| 89 | return if ($el[0] == 0); |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | push(@threads, threads->create('reader2')) for (1..$nthreads); |
| 94 | |
| 95 | $q->enqueue(1..4*$count*$nthreads); |
| 96 | $q->enqueue((0) x ($count*$nthreads)); |
| 97 | |
| 98 | $_->join() foreach @threads; |
| 99 | undef(@threads); |
| 100 | |
| 101 | is($q->pending(), 0, 'Empty queue'); |
| 102 | |
| 103 | |
| 104 | ### ->dequeue_nb(COUNT) test ### |
| 105 | |
| 106 | @items = qw/foo bar baz qux exit/; |
| 107 | $q->enqueue(@items); |
| 108 | is($q->pending(), scalar(@items), 'Queue count'); |
| 109 | |
| 110 | threads->create(sub { |
| 111 | is($q->pending(), scalar(@items), 'Queue count in thread'); |
| 112 | while (my @el = $q->dequeue_nb(2)) { |
| 113 | is($el[0], shift(@items), "Thread got $el[0]"); |
| 114 | if ($el[0] eq 'exit') { |
| 115 | is(scalar(@el), 1, 'Thread to exit'); |
| 116 | } else { |
| 117 | is($el[1], shift(@items), "Thread got $el[1]"); |
| 118 | } |
| 119 | } |
| 120 | is($q->pending(), 0, 'Empty queue'); |
| 121 | $q->enqueue('done'); |
| 122 | })->join(); |
| 123 | |
| 124 | is($q->pending(), 1, 'Queue count after thread'); |
| 125 | is($q->dequeue(), 'done', 'Thread reported done'); |
| 126 | is($q->pending(), 0, 'Empty queue'); |
| 127 | |
| 128 | exit(0); |
| 129 | |
| 130 | # EOF |