Commit | Line | Data |
---|---|---|
54c7876f JH |
1 | use strict; |
2 | use warnings; | |
3 | ||
4 | BEGIN { | |
5 | if ($ENV{'PERL_CORE'}){ | |
6 | chdir('t'); | |
7 | unshift(@INC, '../lib'); | |
8 | } | |
9 | use Config; | |
10 | if (! $Config{'useithreads'}) { | |
11 | print("1..0 # Skip: Perl not compiled with 'useithreads'\n"); | |
12 | exit(0); | |
13 | } | |
14 | } | |
15 | ||
16 | use threads; | |
17 | use Thread::Queue; | |
18 | ||
19 | if ($] == 5.008) { | |
20 | require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 | |
21 | } else { | |
22 | require Test::More; | |
23 | } | |
24 | Test::More->import(); | |
25 | plan('tests' => 81); | |
26 | ||
27 | ### Basic usage with multiple threads ### | |
28 | ||
29 | my $nthreads = 5; | |
30 | ||
31 | my $q = Thread::Queue->new(1..$nthreads); | |
32 | ok($q, 'New queue'); | |
33 | is($q->pending(), $nthreads, 'Pre-populated queue count'); | |
34 | ||
35 | sub reader { | |
36 | my $id = threads->tid(); | |
37 | while ((my $el = $q->dequeue()) != -1) { | |
38 | ok($el >= 1, "Thread $id got $el"); | |
39 | select(undef, undef, undef, rand(1)); | |
40 | } | |
41 | ok(1, "Thread $id done"); | |
42 | } | |
43 | ||
44 | my @threads; | |
45 | push(@threads, threads->create('reader')) for (1..$nthreads); | |
46 | ||
47 | for (1..20) { | |
48 | select(undef, undef, undef, rand(1)); | |
49 | $q->enqueue($_); | |
50 | } | |
51 | ||
52 | $q->enqueue((-1) x $nthreads); # One end marker for each thread | |
53 | ||
54 | $_->join() foreach @threads; | |
55 | undef(@threads); | |
56 | ||
57 | is($q->pending(), 0, 'Empty queue'); | |
58 | ||
59 | ||
60 | ### ->dequeue_nb() test ### | |
61 | ||
62 | $q = Thread::Queue->new(); | |
63 | ok($q, 'New queue'); | |
64 | is($q->pending(), 0, 'Empty queue'); | |
65 | ||
66 | my @items = qw/foo bar baz/; | |
67 | $q->enqueue(@items); | |
68 | ||
69 | threads->create(sub { | |
70 | is($q->pending(), scalar(@items), 'Queue count in thread'); | |
71 | while (my $el = $q->dequeue_nb()) { | |
72 | is($el, shift(@items), "Thread got $el"); | |
73 | } | |
74 | is($q->pending(), 0, 'Empty queue'); | |
75 | $q->enqueue('done'); | |
76 | })->join(); | |
77 | ||
78 | is($q->pending(), 1, 'Queue count after thread'); | |
79 | is($q->dequeue(), 'done', 'Thread reported done'); | |
80 | is($q->pending(), 0, 'Empty queue'); | |
81 | ||
82 | ||
83 | ### ->dequeue(COUNT) test ### | |
84 | ||
85 | my $count = 3; | |
86 | ||
87 | sub reader2 { | |
88 | my $id = threads->tid(); | |
89 | while (1) { | |
90 | my @el = $q->dequeue($count); | |
91 | is(scalar(@el), $count, "Thread $id got @el"); | |
92 | select(undef, undef, undef, rand(1)); | |
93 | return if ($el[0] == 0); | |
94 | } | |
95 | } | |
96 | ||
97 | push(@threads, threads->create('reader2')) for (1..$nthreads); | |
98 | ||
99 | $q->enqueue(1..4*$count*$nthreads); | |
100 | $q->enqueue((0) x ($count*$nthreads)); | |
101 | ||
102 | $_->join() foreach @threads; | |
103 | undef(@threads); | |
104 | ||
105 | is($q->pending(), 0, 'Empty queue'); | |
106 | ||
107 | ||
108 | ### ->dequeue_nb(COUNT) test ### | |
109 | ||
110 | @items = qw/foo bar baz qux exit/; | |
111 | $q->enqueue(@items); | |
112 | is($q->pending(), scalar(@items), 'Queue count'); | |
113 | ||
114 | threads->create(sub { | |
115 | is($q->pending(), scalar(@items), 'Queue count in thread'); | |
116 | while (my @el = $q->dequeue_nb(2)) { | |
117 | is($el[0], shift(@items), "Thread got $el[0]"); | |
118 | if ($el[0] eq 'exit') { | |
119 | is(scalar(@el), 1, 'Thread to exit'); | |
120 | } else { | |
121 | is($el[1], shift(@items), "Thread got $el[1]"); | |
122 | } | |
123 | } | |
124 | is($q->pending(), 0, 'Empty queue'); | |
125 | $q->enqueue('done'); | |
126 | })->join(); | |
127 | ||
128 | is($q->pending(), 1, 'Queue count after thread'); | |
129 | is($q->dequeue(), 'done', 'Thread reported done'); | |
130 | is($q->pending(), 0, 'Empty queue'); | |
131 | ||
132 | # EOF |