threads 1.57
authorJerry D. Hedden <jdhedden@cpan.org>
Wed, 20 Dec 2006 10:30:21 +0000 (02:30 -0800)
committerRafael Garcia-Suarez <rgarciasuarez@gmail.com>
Thu, 21 Dec 2006 10:12:14 +0000 (10:12 +0000)
From: "Jerry D. Hedden" <jdhedden@yahoo.com>
Message-ID: <20061220183021.79793.qmail@web30205.mail.mud.yahoo.com>

p4raw-id: //depot/perl@29608

ext/threads/Changes
ext/threads/README
ext/threads/t/exit.t
ext/threads/t/join.t
ext/threads/t/state.t
ext/threads/t/thread.t
ext/threads/threads.pm
ext/threads/threads.xs

index 643155f..637926f 100755 (executable)
@@ -1,5 +1,12 @@
 Revision history for Perl extension threads.
 
+1.57 Wed Dec 20 13:10:26 EST 2006
+       - Fixes courtesy of Michael J. Pomraning
+           Eliminates self joins.
+           Eliminates multiple, simultaneous joins on a thread.
+           Protects thread->state variable with mutexes.
+           Checks that OS join call is successful.
+
 1.56 Fri Dec 15 12:18:47 EST 2006
        - More fixes to test suite
 
index ae21582..210d012 100755 (executable)
@@ -1,4 +1,4 @@
-threads version 1.56
+threads version 1.57
 ====================
 
 This module exposes interpreter threads to the Perl level.
index a23d394..49549a9 100644 (file)
@@ -56,7 +56,7 @@ my $rc = $thr->join();
 ok(! defined($rc), 'Exited: threads->exit()');
 
 
-run_perl(prog => 'use threads 1.56;' .
+run_perl(prog => 'use threads 1.57;' .
                  'threads->exit(86);' .
                  'exit(99);',
          nolib => ($ENV{PERL_CORE}) ? 0 : 1,
@@ -104,7 +104,7 @@ $rc = $thr->join();
 ok(! defined($rc), 'Exited: $thr->set_thread_exit_only');
 
 
-run_perl(prog => 'use threads 1.56 qw(exit thread_only);' .
+run_perl(prog => 'use threads 1.57 qw(exit thread_only);' .
                  'threads->create(sub { exit(99); })->join();' .
                  'exit(86);',
          nolib => ($ENV{PERL_CORE}) ? 0 : 1,
@@ -112,10 +112,11 @@ run_perl(prog => 'use threads 1.56 qw(exit thread_only);' .
 is($?>>8, 86, "'use threads 'exit' => 'thread_only'");
 
 
-my $out = run_perl(prog => 'use threads 1.56;' .
+my $out = run_perl(prog => 'use threads 1.57;' .
                            'threads->create(sub {' .
                            '    exit(99);' .
-                           '})->join();' .
+                           '});' .
+                           'sleep(1);' .
                            'exit(86);',
                    nolib => ($ENV{PERL_CORE}) ? 0 : 1,
                    switches => ($ENV{PERL_CORE}) ? [] : [ '-Mblib' ],
@@ -124,11 +125,12 @@ is($?>>8, 99, "exit(status) in thread");
 like($out, '1 finished and unjoined', "exit(status) in thread");
 
 
-$out = run_perl(prog => 'use threads 1.56 qw(exit thread_only);' .
+$out = run_perl(prog => 'use threads 1.57 qw(exit thread_only);' .
                         'threads->create(sub {' .
                         '   threads->set_thread_exit_only(0);' .
                         '   exit(99);' .
-                        '})->join();' .
+                        '});' .
+                        'sleep(1);' .
                         'exit(86);',
                 nolib => ($ENV{PERL_CORE}) ? 0 : 1,
                 switches => ($ENV{PERL_CORE}) ? [] : [ '-Mblib' ],
@@ -137,7 +139,7 @@ is($?>>8, 99, "set_thread_exit_only(0)");
 like($out, '1 finished and unjoined', "set_thread_exit_only(0)");
 
 
-run_perl(prog => 'use threads 1.56;' .
+run_perl(prog => 'use threads 1.57;' .
                  'threads->create(sub {' .
                  '   $SIG{__WARN__} = sub { exit(99); };' .
                  '   die();' .
index bebfd6d..0dd9514 100644 (file)
@@ -28,7 +28,7 @@ BEGIN {
     }
 
     $| = 1;
-    print("1..17\n");   ### Number of tests that will be run ###
+    print("1..20\n");   ### Number of tests that will be run ###
 };
 
 my $TEST;
@@ -181,4 +181,51 @@ if ($^O eq 'linux') {
     $_->join for map threads->create(sub{ok($_, "stress newCONSTSUB")}), 1..2;
 }
 
+{
+    my $go : shared = 0;
+
+    my $t = threads->create( sub {
+        lock($go);
+        cond_wait($go) until $go;
+    }); 
+
+    my $joiner = threads->create(sub { $_[0]->join }, $t);
+
+    threads->yield();
+    sleep 1;
+    eval { $t->join; };
+    ok(($@ =~ /^Thread already joined at/)?1:0, "Join pending join");
+
+    { lock($go); $go = 1; cond_signal($go); }
+    $joiner->join;
+}
+
+{
+    my $go : shared = 0;
+    my $t = threads->create( sub {
+        eval { threads->self->join; };
+        ok(($@ =~ /^Cannot join self/), "Join self");
+        lock($go); $go = 1; cond_signal($go);
+    });
+
+    { lock ($go); cond_wait($go) until $go; }
+    $t->join;
+}
+
+{
+    my $go : shared = 0;
+    my $t = threads->create( sub {
+        lock($go);  cond_wait($go) until $go;
+    });
+    my $joiner = threads->create(sub { $_[0]->join; }, $t);
+
+    threads->yield();
+    sleep 1;
+    eval { $t->detach };
+    ok(($@ =~ /^Cannot detach a joined thread at/)?1:0, "Detach pending join");
+
+    { lock($go); $go = 1; cond_signal($go); }
+    $joiner->join;
+}
+
 # EOF
index 80724db..b786840 100644 (file)
@@ -28,7 +28,7 @@ BEGIN {
     }
 
     $| = 1;
-    print("1..53\n");   ### Number of tests that will be run ###
+    print("1..59\n");   ### Number of tests that will be run ###
 };
 
 my $TEST;
@@ -180,11 +180,87 @@ ok($thr->is_detached(),   'thread detached');
 ok(! $thr->is_joinable(), 'thread not joinable');
 ok(threads->list(threads::joinable) == 0, 'thread joinable list');
 
-threads->create(sub {
-    ok(! threads->is_detached(), 'thread not detached');
-    ok(threads->list(threads::running) == 1, 'thread running list');
-    ok(threads->list(threads::joinable) == 0, 'thread joinable list');
-    ok(threads->list(threads::all) == 1, 'thread list');
-})->join();
+{
+    my $go : shared = 0;
+    my $t = threads->create( sub {
+        ok(! threads->is_detached(), 'thread not detached');
+        ok(threads->list(threads::running) == 1, 'thread running list');
+        ok(threads->list(threads::joinable) == 0, 'thread joinable list');
+        ok(threads->list(threads::all) == 1, 'thread list');
+        lock($go); $go = 1; cond_signal($go);
+    });
+
+    { lock ($go); cond_wait($go) until $go; }
+    $t->join;
+}
+
+{
+    my $rdy :shared = 0;
+    sub thr_ready
+    {
+        lock($rdy);
+        $rdy++;
+        cond_signal($rdy);
+    }
+
+    my $go :shared = 0;
+    sub thr_wait
+    {
+        lock($go);
+        cond_wait($go) until $go;
+    }
+
+    my $done :shared = 0;
+    sub thr_done
+    {
+        lock($done);
+        $done++;
+        cond_signal($done);
+    }
+
+    my $thr_routine = sub { thr_ready(); thr_wait(); thr_done(); };
+
+    # Create 8 threads:
+    #  3 running, blocking on $go
+    #  2 running, blocking on $go, join pending
+    #  2 running, blocking on join of above
+    #  1 finished, unjoined
+
+    for (1..3) { threads->create($thr_routine); }
+
+    foreach my $t (map {threads->create($thr_routine)} 1..2) {
+        threads->create(sub { thr_ready(); $_[0]->join; thr_done(); }, $t);
+    }
+    threads->create(sub { thr_ready(); thr_done(); });
+    {
+        lock($done);
+        cond_wait($done) until ($done == 1);
+    }
+    {
+        lock($rdy);
+        cond_wait($rdy) until ($rdy == 8);
+    }
+    threads->yield();
+    sleep(1);
+
+    ok(threads->list(threads::running) == 5, 'thread running list');
+    ok(threads->list(threads::joinable) == 1, 'thread joinable list');
+    ok(threads->list(threads::all) == 6, 'thread all list');
+
+    { lock($go); $go = 1; cond_broadcast($go); }
+    {
+        lock($done);
+        cond_wait($done) until ($done == 8);
+    }
+    threads->yield();
+    sleep(1);
+
+    ok(threads->list(threads::running) == 0, 'thread running list');
+    # Two awaiting join() have completed
+    ok(threads->list(threads::joinable) == 6, 'thread joinable list');
+    ok(threads->list(threads::all) == 6, 'thread all list');
+
+    for (threads->list) { $_->join; }
+}
 
 # EOF
index 3d3989e..7e71900 100644 (file)
@@ -171,7 +171,7 @@ package main;
 
 # bugid #24165
 
-run_perl(prog => 'use threads 1.56;' .
+run_perl(prog => 'use threads 1.57;' .
                  'sub a{threads->create(shift)} $t = a sub{};' .
                  '$t->tid; $t->join; $t->tid',
          nolib => ($ENV{PERL_CORE}) ? 0 : 1,
index eff472f..69214b3 100755 (executable)
@@ -5,7 +5,7 @@ use 5.008;
 use strict;
 use warnings;
 
-our $VERSION = '1.56';
+our $VERSION = '1.57';
 my $XS_VERSION = $VERSION;
 $VERSION = eval $VERSION;
 
@@ -133,7 +133,7 @@ threads - Perl interpreter-based threads
 
 =head1 VERSION
 
-This document describes threads version 1.56
+This document describes threads version 1.57
 
 =head1 SYNOPSIS
 
@@ -373,7 +373,7 @@ list of all non-joined, non-detached I<threads> objects.  In a scalar context,
 returns a count of the same.
 
 With a I<true> argument (using C<threads::running>), returns a list of all
-non-detached I<threads> objects that are still running.
+non-joined, non-detached I<threads> objects that are still running.
 
 With a I<false> argument (using C<threads::joinable>), returns a list of all
 non-joined, non-detached I<threads> objects that have finished running (i.e.,
@@ -949,7 +949,7 @@ L<threads> Discussion Forum on CPAN:
 L<http://www.cpanforum.com/dist/threads>
 
 Annotated POD for L<threads>:
-L<http://annocpan.org/~JDHEDDEN/threads-1.56/threads.pm>
+L<http://annocpan.org/~JDHEDDEN/threads-1.57/threads.pm>
 
 L<threads::shared>, L<perlthrtut>
 
index cc4e7c9..f15e40e 100755 (executable)
@@ -47,6 +47,7 @@ typedef perl_os_thread pthread_t;
 /* Values for 'state' member */
 #define PERL_ITHR_DETACHED              1
 #define PERL_ITHR_JOINED                2
+#define PERL_ITHR_UNCALLABLE            (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)
 #define PERL_ITHR_FINISHED              4
 #define PERL_ITHR_THREAD_EXIT_ONLY      8
 #define PERL_ITHR_NONVIABLE             16
@@ -138,7 +139,7 @@ S_ithread_clear(pTHX_ ithread *thread)
     PerlInterpreter *interp;
 
     assert(((thread->state & PERL_ITHR_FINISHED) &&
-            (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))
+            (thread->state & PERL_ITHR_UNCALLABLE))
                 ||
            (thread->state & PERL_ITHR_NONVIABLE));
 
@@ -187,7 +188,7 @@ S_ithread_destruct(pTHX_ ithread *thread)
         destroy = 1;
     } else if (! (thread->state & PERL_ITHR_FINISHED)) {
         destroy = 0;
-    } else if (! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) {
+    } else if (! (thread->state & PERL_ITHR_UNCALLABLE)) {
         destroy = 0;
     } else {
         thread->state |= PERL_ITHR_DESTROYED;
@@ -847,8 +848,10 @@ ithread_create(...)
             /* $thr->create() */
             classname = HvNAME(SvSTASH(SvRV(ST(0))));
             thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
+            MUTEX_LOCK(&thread->mutex);
             stack_size = thread->stack_size;
             exit_opt = thread->state & PERL_ITHR_THREAD_EXIT_ONLY;
+            MUTEX_UNLOCK(&thread->mutex);
         } else {
             /* threads->create() */
             classname = (char *)SvPV_nolen(ST(0));
@@ -952,6 +955,7 @@ ithread_list(...)
         int list_context;
         IV count = 0;
         int want_running = 0;
+        int state;
         dMY_POOL;
     PPCODE:
         /* Class method only */
@@ -974,19 +978,23 @@ ithread_list(...)
              thread != &MY_POOL.main_thread;
              thread = thread->next)
         {
+            MUTEX_LOCK(&thread->mutex);
+            state = thread->state;
+            MUTEX_UNLOCK(&thread->mutex);
+
             /* Ignore detached or joined threads */
-            if (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)) {
+            if (state & PERL_ITHR_UNCALLABLE) {
                 continue;
             }
 
             /* Filter per parameter */
             if (items > 1) {
                 if (want_running) {
-                    if (thread->state & PERL_ITHR_FINISHED) {
+                    if (state & PERL_ITHR_FINISHED) {
                         continue;   /* Not running */
                     }
                 } else {
-                    if (! (thread->state & PERL_ITHR_FINISHED)) {
+                    if (! (state & PERL_ITHR_FINISHED)) {
                         continue;   /* Still running - not joinable yet */
                     }
                 }
@@ -1038,6 +1046,7 @@ void
 ithread_join(...)
     PREINIT:
         ithread *thread;
+        ithread *current_thread;
         int join_err;
         AV *params;
         int len;
@@ -1045,6 +1054,7 @@ ithread_join(...)
 #ifdef WIN32
         DWORD waitcode;
 #else
+        int rc_join;
         void *retval;
 #endif
         dMY_POOL;
@@ -1054,42 +1064,56 @@ ithread_join(...)
             Perl_croak(aTHX_ "Usage: $thr->join()");
         }
 
-        /* Check if the thread is joinable */
+        /* Check if the thread is joinable and not ourselves */
         thread = S_SV_to_ithread(aTHX_ ST(0));
-        join_err = (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED));
-        if (join_err) {
-            if (join_err & PERL_ITHR_DETACHED) {
-                Perl_croak(aTHX_ "Cannot join a detached thread");
-            } else {
-                Perl_croak(aTHX_ "Thread already joined");
-            }
+        current_thread = S_ithread_get(aTHX);
+
+        MUTEX_LOCK(&thread->mutex);
+        if ((join_err = (thread->state & PERL_ITHR_UNCALLABLE))) {
+            MUTEX_UNLOCK(&thread->mutex);
+            Perl_croak(aTHX_ (join_err & PERL_ITHR_DETACHED)
+                                ? "Cannot join a detached thread"
+                                : "Thread already joined");
+        } else if (thread->tid == current_thread->tid) {
+            MUTEX_UNLOCK(&thread->mutex);
+            Perl_croak(aTHX_ "Cannot join self");
         }
 
+        /* Mark as joined */
+        thread->state |= PERL_ITHR_JOINED;
+        MUTEX_UNLOCK(&thread->mutex);
+
+        MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
+        MY_POOL.joinable_threads--;
+        MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
+
         /* Join the thread */
 #ifdef WIN32
-        waitcode = WaitForSingleObject(thread->handle, INFINITE);
+        if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) {
+            /* Timeout/abandonment unexpected here; check $^E */
+            Perl_croak(aTHX_ "PANIC: underlying join failed");
+        };
 #else
-        pthread_join(thread->thr, &retval);
+        if ((rc_join = pthread_join(thread->thr, &retval)) != 0) {
+            /* In progress/deadlock/unknown unexpected here; check $! */
+            errno = rc_join;
+            Perl_croak(aTHX_ "PANIC: underlying join failed");
+        };
 #endif
 
         MUTEX_LOCK(&thread->mutex);
-        /* Mark as joined */
-        thread->state |= PERL_ITHR_JOINED;
-
         /* Get the return value from the call_sv */
         /* Objects do not survive this process - FIXME */
         {
             AV *params_copy;
             PerlInterpreter *other_perl;
             CLONE_PARAMS clone_params;
-            ithread *current_thread;
 
             params_copy = (AV *)SvRV(thread->params);
             other_perl = thread->interp;
             clone_params.stashes = newAV();
             clone_params.flags = CLONEf_JOIN_IN;
             PL_ptr_table = ptr_table_new();
-            current_thread = S_ithread_get(aTHX);
             S_ithread_set(aTHX_ thread);
             /* Ensure 'meaningful' addresses retain their meaning */
             ptr_table_store(PL_ptr_table, &other_perl->Isv_undef, &PL_sv_undef);
@@ -1109,12 +1133,6 @@ ithread_join(...)
         }
         MUTEX_UNLOCK(&thread->mutex);
 
-        MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
-        if (! (thread->state & PERL_ITHR_DETACHED)) {
-            MY_POOL.joinable_threads--;
-        }
-        MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
-
         /* Try to cleanup thread */
         S_ithread_destruct(aTHX_ thread);
 
@@ -1150,34 +1168,34 @@ ithread_detach(...)
     CODE:
         PERL_UNUSED_VAR(items);
 
-        /* Check if the thread is detachable */
-        thread = S_SV_to_ithread(aTHX_ ST(0));
-        if ((detach_err = (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))) {
-            if (detach_err & PERL_ITHR_DETACHED) {
-                Perl_croak(aTHX_ "Thread already detached");
-            } else {
-                Perl_croak(aTHX_ "Cannot detach a joined thread");
-            }
-        }
-
         /* Detach the thread */
+        thread = S_SV_to_ithread(aTHX_ ST(0));
         MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
         MUTEX_LOCK(&thread->mutex);
-        thread->state |= PERL_ITHR_DETACHED;
+        if (! (detach_err = (thread->state & PERL_ITHR_UNCALLABLE))) {
+            /* Thread is detachable */
+            thread->state |= PERL_ITHR_DETACHED;
 #ifdef WIN32
-        /* Windows has no 'detach thread' function */
+            /* Windows has no 'detach thread' function */
 #else
-        PERL_THREAD_DETACH(thread->thr);
+            PERL_THREAD_DETACH(thread->thr);
 #endif
-        if (thread->state & PERL_ITHR_FINISHED) {
-            MY_POOL.joinable_threads--;
-        } else {
-            MY_POOL.running_threads--;
-            MY_POOL.detached_threads++;
+            if (thread->state & PERL_ITHR_FINISHED) {
+                MY_POOL.joinable_threads--;
+            } else {
+                MY_POOL.running_threads--;
+                MY_POOL.detached_threads++;
+            }
         }
         MUTEX_UNLOCK(&thread->mutex);
         MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
 
+        if (detach_err) {
+            Perl_croak(aTHX_ (detach_err & PERL_ITHR_DETACHED)
+                                ? "Thread already detached"
+                                : "Cannot detach a joined thread");
+        }
+
         /* If thread is finished and didn't die,
          * then we can free its interpreter */
         MUTEX_LOCK(&thread->mutex);
@@ -1272,6 +1290,7 @@ ithread_object(...)
         char *classname;
         UV tid;
         ithread *thread;
+        int state;
         int have_obj = 0;
         dMY_POOL;
     CODE:
@@ -1297,7 +1316,10 @@ ithread_object(...)
             /* Look for TID */
             if (thread->tid == tid) {
                 /* Ignore if detached or joined */
-                if (! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) {
+                MUTEX_LOCK(&thread->mutex);
+                state = thread->state;
+                MUTEX_UNLOCK(&thread->mutex);
+                if (! (state & PERL_ITHR_UNCALLABLE)) {
                     /* Put object on stack */
                     ST(0) = sv_2mortal(S_ithread_to_SV(aTHX_ Nullsv, thread, classname, TRUE));
                     have_obj = 1;
@@ -1377,7 +1399,9 @@ ithread_is_running(...)
         }
 
         thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
+        MUTEX_LOCK(&thread->mutex);
         ST(0) = (thread->state & PERL_ITHR_FINISHED) ? &PL_sv_no : &PL_sv_yes;
+        MUTEX_UNLOCK(&thread->mutex);
         /* XSRETURN(1); - implied */
 
 
@@ -1388,7 +1412,9 @@ ithread_is_detached(...)
     CODE:
         PERL_UNUSED_VAR(items);
         thread = S_SV_to_ithread(aTHX_ ST(0));
+        MUTEX_LOCK(&thread->mutex);
         ST(0) = (thread->state & PERL_ITHR_DETACHED) ? &PL_sv_yes : &PL_sv_no;
+        MUTEX_UNLOCK(&thread->mutex);
         /* XSRETURN(1); - implied */
 
 
@@ -1405,7 +1431,7 @@ ithread_is_joinable(...)
         thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
         MUTEX_LOCK(&thread->mutex);
         ST(0) = ((thread->state & PERL_ITHR_FINISHED) &&
-                 ! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))
+                 ! (thread->state & PERL_ITHR_UNCALLABLE))
             ? &PL_sv_yes : &PL_sv_no;
         MUTEX_UNLOCK(&thread->mutex);
         /* XSRETURN(1); - implied */