This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Upgrade to Thread::Queue 3.01
[perl5.git] / dist / Thread-Queue / t / 09_ended.t
1 use strict;
2 use warnings;
3
4 use Config;
5
6 BEGIN {
7     if (! $Config{'useithreads'}) {
8         print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
9         exit(0);
10     }
11     if (! $Config{'d_select'}) {
12         print("1..0 # SKIP 'select()' not available for testing\n");
13         exit(0);
14     }
15 }
16
17 use threads;
18 use Thread::Queue;
19
20 use Test::More;
21
22 my $num_threads = 3;
23 my $cycles = 2;
24 my $count = 2;
25 plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;
26
27 # Test for end() while threads are blocked and no more items in queue
28 {
29     my @items = 1..($num_threads*$cycles*$count);
30     my $q = Thread::Queue->new(@items);
31     my $r = Thread::Queue->new();
32
33     my @threads;
34     for my $ii (1..$num_threads) {
35         push @threads, threads->create( sub {
36             # Thread will loop until no more work is coming
37             LOOP:
38             while (my @set = $q->dequeue($count)) {
39                 foreach my $item (@set) {
40                     last LOOP if (! defined($item));
41                     pass("'$item' read from queue in thread $ii");
42                 }
43                 select(undef, undef, undef, rand(1));
44                 $r->enqueue($ii);
45             }
46             pass("Thread $ii exiting");
47         });
48     }
49
50     # Make sure there's nothing in the queue and threads are blocking
51     for my $ii (1..($num_threads*$cycles)) {
52         $r->dequeue();
53     }
54     sleep(1);
55     threads->yield();
56
57     is($q->pending(), 0, 'Queue is empty');
58
59     # Signal no more work is coming
60     $q->end();
61
62     is($q->pending(), undef, 'Queue is ended');
63
64     for my $thread (@threads) {
65         $thread->join;
66         pass($thread->tid." joined");
67     }
68 }
69
70 # Test for end() while threads are blocked and items still remain in queue
71 {
72     my @items = 1..($num_threads*$cycles*$count + 1);
73     my $q = Thread::Queue->new(@items);
74     my $r = Thread::Queue->new();
75
76     my @threads;
77     for my $ii (1..$num_threads) {
78         push @threads, threads->create( sub {
79             # Thread will loop until no more work is coming
80             LOOP:
81             while (my @set = $q->dequeue($count)) {
82                 foreach my $item (@set) {
83                     last LOOP if (! defined($item));
84                     pass("'$item' read from queue in thread $ii");
85                 }
86                 select(undef, undef, undef, rand(1));
87                 $r->enqueue($ii);
88             }
89             pass("Thread $ii exiting");
90         });
91     }
92
93     # Make sure there's nothing in the queue and threads are blocking
94     for my $ii (1..($num_threads*$cycles)) {
95         $r->dequeue();
96     }
97     sleep(1);
98     threads->yield();
99
100     is($q->pending(), 1, 'Queue has one left');
101
102     # Signal no more work is coming
103     $q->end();
104
105     for my $thread (@threads) {
106         $thread->join;
107         pass($thread->tid." joined");
108     }
109
110     is($q->pending(), undef, 'Queue is ended');
111 }
112
113 # Test of end() send while items in queue
114 {
115     my @items = 1..($num_threads*$cycles*$count + 1);
116     my $q = Thread::Queue->new(@items);
117
118     my @threads;
119     for my $ii (1..$num_threads) {
120         push @threads, threads->create( sub {
121             # Thread will loop until no more work is coming
122             LOOP:
123             while (my @set = $q->dequeue($count)) {
124                 foreach my $item (@set) {
125                     last LOOP if (! defined($item));
126                     pass("'$item' read from queue in thread $ii");
127                 }
128                 select(undef, undef, undef, rand(1));
129             }
130             pass("Thread $ii exiting");
131         });
132     }
133
134     # Signal no more work is coming to the blocked threads, they
135     # should unblock.
136     $q->end();
137
138     for my $thread (@threads) {
139         $thread->join;
140         pass($thread->tid." joined");
141     }
142 }
143
144 exit(0);
145
146 # EOF