Commit | Line | Data |
---|---|---|
1fd4700e JH |
1 | use strict; |
2 | use warnings; | |
3 | ||
4 | use Config; | |
5 | ||
6 | BEGIN { | |
7 | if (! $Config{'useithreads'}) { | |
8 | print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); | |
9 | exit(0); | |
10 | } | |
11 | if (! $Config{'d_select'}) { | |
12 | print("1..0 # SKIP 'select()' not available for testing\n"); | |
13 | exit(0); | |
14 | } | |
15 | } | |
16 | ||
17 | use threads; | |
18 | use Thread::Queue; | |
19 | ||
20 | use Test::More; | |
21 | ||
22 | my $num_threads = 3; | |
23 | my $cycles = 2; | |
24 | my $count = 2; | |
25 | plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6; | |
26 | ||
27 | # Test for end() while threads are blocked and no more items in queue | |
28 | { | |
29 | my @items = 1..($num_threads*$cycles*$count); | |
30 | my $q = Thread::Queue->new(@items); | |
31 | my $r = Thread::Queue->new(); | |
32 | ||
33 | my @threads; | |
34 | for my $ii (1..$num_threads) { | |
35 | push @threads, threads->create( sub { | |
36 | # Thread will loop until no more work is coming | |
37 | LOOP: | |
38 | while (my @set = $q->dequeue($count)) { | |
39 | foreach my $item (@set) { | |
40 | last LOOP if (! defined($item)); | |
41 | pass("'$item' read from queue in thread $ii"); | |
42 | } | |
43 | select(undef, undef, undef, rand(1)); | |
44 | $r->enqueue($ii); | |
45 | } | |
46 | pass("Thread $ii exiting"); | |
47 | }); | |
48 | } | |
49 | ||
50 | # Make sure there's nothing in the queue and threads are blocking | |
51 | for my $ii (1..($num_threads*$cycles)) { | |
52 | $r->dequeue(); | |
53 | } | |
54 | sleep(1); | |
55 | threads->yield(); | |
56 | ||
57 | is($q->pending(), 0, 'Queue is empty'); | |
58 | ||
59 | # Signal no more work is coming | |
60 | $q->end(); | |
61 | ||
62 | is($q->pending(), undef, 'Queue is ended'); | |
63 | ||
64 | for my $thread (@threads) { | |
65 | $thread->join; | |
66 | pass($thread->tid." joined"); | |
67 | } | |
68 | } | |
69 | ||
70 | # Test for end() while threads are blocked and items still remain in queue | |
71 | { | |
72 | my @items = 1..($num_threads*$cycles*$count + 1); | |
73 | my $q = Thread::Queue->new(@items); | |
74 | my $r = Thread::Queue->new(); | |
75 | ||
76 | my @threads; | |
77 | for my $ii (1..$num_threads) { | |
78 | push @threads, threads->create( sub { | |
79 | # Thread will loop until no more work is coming | |
80 | LOOP: | |
81 | while (my @set = $q->dequeue($count)) { | |
82 | foreach my $item (@set) { | |
83 | last LOOP if (! defined($item)); | |
84 | pass("'$item' read from queue in thread $ii"); | |
85 | } | |
86 | select(undef, undef, undef, rand(1)); | |
87 | $r->enqueue($ii); | |
88 | } | |
89 | pass("Thread $ii exiting"); | |
90 | }); | |
91 | } | |
92 | ||
93 | # Make sure there's nothing in the queue and threads are blocking | |
94 | for my $ii (1..($num_threads*$cycles)) { | |
95 | $r->dequeue(); | |
96 | } | |
97 | sleep(1); | |
98 | threads->yield(); | |
99 | ||
100 | is($q->pending(), 1, 'Queue has one left'); | |
101 | ||
102 | # Signal no more work is coming | |
103 | $q->end(); | |
104 | ||
105 | for my $thread (@threads) { | |
106 | $thread->join; | |
107 | pass($thread->tid." joined"); | |
108 | } | |
109 | ||
110 | is($q->pending(), undef, 'Queue is ended'); | |
111 | } | |
112 | ||
113 | # Test of end() send while items in queue | |
114 | { | |
115 | my @items = 1..($num_threads*$cycles*$count + 1); | |
116 | my $q = Thread::Queue->new(@items); | |
117 | ||
118 | my @threads; | |
119 | for my $ii (1..$num_threads) { | |
120 | push @threads, threads->create( sub { | |
121 | # Thread will loop until no more work is coming | |
122 | LOOP: | |
123 | while (my @set = $q->dequeue($count)) { | |
124 | foreach my $item (@set) { | |
125 | last LOOP if (! defined($item)); | |
126 | pass("'$item' read from queue in thread $ii"); | |
127 | } | |
128 | select(undef, undef, undef, rand(1)); | |
129 | } | |
130 | pass("Thread $ii exiting"); | |
131 | }); | |
132 | } | |
133 | ||
134 | # Signal no more work is coming to the blocked threads, they | |
135 | # should unblock. | |
136 | $q->end(); | |
137 | ||
138 | for my $thread (@threads) { | |
139 | $thread->join; | |
140 | pass($thread->tid." joined"); | |
141 | } | |
142 | } | |
143 | ||
144 | exit(0); | |
145 | ||
146 | # EOF |