This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Improve internal threading API. Introduce win32/win32thread.[ch]
[perl5.git] / ext / Thread / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 /* Magic signature for Thread's mg_private is "Th" */ 
6 #define Thread_MAGIC_SIGNATURE 0x5468
7
8 static U32 threadnum = 0;
9 static int sig_pipe[2];
10
11 static void
12 remove_thread(t)
13 Thread t;
14 {
15     DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
16                                    "%p: remove_thread %p\n", thr, t)));
17     MUTEX_LOCK(&threads_mutex);
18     MUTEX_DESTROY(&t->mutex);
19     nthreads--;
20     t->prev->next = t->next;
21     t->next->prev = t->prev;
22     COND_BROADCAST(&nthreads_cond);
23     MUTEX_UNLOCK(&threads_mutex);
24 }
25
26 static THREAD_RET_TYPE
27 threadstart(arg)
28 void *arg;
29 {
30 #ifdef FAKE_THREADS
31     Thread savethread = thr;
32     LOGOP myop;
33     dSP;
34     I32 oldscope = scopestack_ix;
35     I32 retval;
36     AV *returnav;
37     int i;
38
39     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
40                           thr, SvPEEK(TOPs)));
41     thr = (Thread) arg;
42     savemark = TOPMARK;
43     thr->prev = thr->prev_run = savethread;
44     thr->next = savethread->next;
45     thr->next_run = savethread->next_run;
46     savethread->next = savethread->next_run = thr;
47     thr->wait_queue = 0;
48     thr->private = 0;
49
50     /* Now duplicate most of perl_call_sv but with a few twists */
51     op = (OP*)&myop;
52     Zero(op, 1, LOGOP);
53     myop.op_flags = OPf_STACKED;
54     myop.op_next = Nullop;
55     myop.op_flags |= OPf_KNOW;
56     myop.op_flags |= OPf_WANT_LIST;
57     op = pp_entersub(ARGS);
58     DEBUG_L(if (!op)
59             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
60     /*
61      * When this thread is next scheduled, we start in the right
62      * place. When the thread runs off the end of the sub, perl.c
63      * handles things, using savemark to figure out how much of the
64      * stack is the return value for any join.
65      */
66     thr = savethread;           /* back to the old thread */
67     return 0;
68 #else
69     Thread thr = (Thread) arg;
70     LOGOP myop;
71     dSP;
72     I32 oldmark = TOPMARK;
73     I32 oldscope = scopestack_ix;
74     I32 retval;
75     AV *returnav;
76     int i, ret;
77     dJMPENV;
78
79     /* Don't call *anything* requiring dTHR until after pthread_setspecific */
80     /*
81      * Wait until our creator releases us. If we didn't do this, then
82      * it would be potentially possible for out thread to carry on and
83      * do stuff before our creator fills in our "self" field. For example,
84      * if we went and created another thread which tried to JOIN with us,
85      * then we'd be in a mess.
86      */
87     MUTEX_LOCK(&thr->mutex);
88     MUTEX_UNLOCK(&thr->mutex);
89
90     /*
91      * It's safe to wait until now to set the thread-specific pointer
92      * from our pthread_t structure to our struct thread, since we're
93      * the only thread who can get at it anyway.
94      */
95     SET_THR(thr);
96
97     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
98     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
99                           thr, SvPEEK(TOPs)));
100
101     JMPENV_PUSH(ret);
102     switch (ret) {
103     case 3:
104         PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
105         /* fall through */
106     case 1:
107         STATUS_ALL_FAILURE;
108         /* fall through */
109     case 2:
110         /* my_exit() was called */
111         while (scopestack_ix > oldscope)
112             LEAVE;
113         JMPENV_POP;
114         av_store(returnav, 0, newSViv(statusvalue));
115         goto finishoff;
116     }
117
118     /* Now duplicate most of perl_call_sv but with a few twists */
119     op = (OP*)&myop;
120     Zero(op, 1, LOGOP);
121     myop.op_flags = OPf_STACKED;
122     myop.op_next = Nullop;
123     myop.op_flags |= OPf_KNOW;
124     myop.op_flags |= OPf_WANT_LIST;
125     op = pp_entersub(ARGS);
126     if (op)
127         runops();
128     SPAGAIN;
129     retval = sp - (stack_base + oldmark);
130     sp = stack_base + oldmark + 1;
131     DEBUG_L(for (i = 1; i <= retval; i++)
132                 PerlIO_printf(PerlIO_stderr(),
133                               "%p returnav[%d] = %s\n",
134                               thr, i, SvPEEK(sp[i - 1]));)
135     returnav = newAV();
136     av_store(returnav, 0, newSVpv("", 0));
137     for (i = 1; i <= retval; i++, sp++)
138         sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
139     
140   finishoff:
141 #if 0    
142     /* removed for debug */
143     SvREFCNT_dec(curstack);
144 #endif
145     SvREFCNT_dec(cvcache);
146     Safefree(markstack);
147     Safefree(scopestack);
148     Safefree(savestack);
149     Safefree(retstack);
150     Safefree(cxstack);
151     Safefree(tmps_stack);
152
153     MUTEX_LOCK(&thr->mutex);
154     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
155                           "%p: threadstart finishing: state is %u\n",
156                           thr, ThrSTATE(thr)));
157     switch (ThrSTATE(thr)) {
158     case THRf_R_JOINABLE:
159         ThrSETSTATE(thr, THRf_ZOMBIE);
160         MUTEX_UNLOCK(&thr->mutex);
161         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
162                               "%p: R_JOINABLE thread finished\n", thr));
163         break;
164     case THRf_R_JOINED:
165         ThrSETSTATE(thr, THRf_DEAD);
166         MUTEX_UNLOCK(&thr->mutex);
167         remove_thread(thr);
168         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
169                               "%p: R_JOINED thread finished\n", thr));
170         break;
171     case THRf_R_DETACHED:
172         ThrSETSTATE(thr, THRf_DEAD);
173         MUTEX_UNLOCK(&thr->mutex);
174         SvREFCNT_dec(returnav);
175         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
176                               "%p: DETACHED thread finished\n", thr));
177         remove_thread(thr);     /* This might trigger main thread to finish */
178         break;
179     default:
180         MUTEX_UNLOCK(&thr->mutex);
181         croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
182         /* NOTREACHED */
183     }
184     return THREAD_RET_CAST(returnav);   /* Available for anyone to join with */
185                                         /* us unless we're detached, in which */
186                                         /* case noone sees the value anyway. */
187 #endif    
188 }
189
190 static SV *
191 newthread(startsv, initargs, class)
192 SV *startsv;
193 AV *initargs;
194 char *class;
195 {
196     dTHR;
197     dSP;
198     Thread savethread;
199     int i;
200     SV *sv;
201     int err;
202 #ifndef THREAD_CREATE
203     sigset_t fullmask, oldmask;
204 #endif
205     
206     savethread = thr;
207     sv = newSVpv("", 0);
208     SvGROW(sv, sizeof(struct thread) + 1);
209     SvCUR_set(sv, sizeof(struct thread));
210     thr = (Thread) SvPVX(sv);
211     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n",
212                           savethread, SvPEEK(startsv), thr));
213     oursv = sv; 
214     /* If we don't zero these foostack pointers, init_stacks won't init them */
215     markstack = 0;
216     scopestack = 0;
217     savestack = 0;
218     retstack = 0;
219     init_stacks(ARGS);
220     curcop = savethread->Tcurcop;       /* XXX As good a guess as any? */
221     SPAGAIN;
222     defstash = savethread->Tdefstash;   /* XXX maybe these should */
223     curstash = savethread->Tcurstash;   /* always be set to main? */
224     /* top_env? */
225     /* runlevel */
226     cvcache = newHV();
227     thr->flags = THRf_R_JOINABLE;
228     MUTEX_INIT(&thr->mutex);
229     thr->tid = ++threadnum;
230     /* Insert new thread into the circular linked list and bump nthreads */
231     MUTEX_LOCK(&threads_mutex);
232     thr->next = savethread->next;
233     thr->prev = savethread;
234     savethread->next = thr;
235     thr->next->prev = thr;
236     nthreads++;
237     MUTEX_UNLOCK(&threads_mutex);
238
239     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
240                           "%p: newthread, tid is %u, preparing stack\n",
241                           savethread, thr->tid));
242     /* The following pushes the arg list and startsv onto the *new* stack */
243     PUSHMARK(sp);
244     /* Could easily speed up the following greatly */
245     for (i = 0; i <= AvFILL(initargs); i++)
246         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
247     XPUSHs(SvREFCNT_inc(startsv));
248     PUTBACK;
249
250 #ifdef THREAD_CREATE
251     THREAD_CREATE(thr, threadstart);
252 #else    
253     /* On your marks... */
254     MUTEX_LOCK(&thr->mutex);
255     /* Get set...  */
256     sigfillset(&fullmask);
257     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
258         croak("panic: sigprocmask");
259     err = pthread_create(&self, pthread_attr_default, threadstart, (void*) thr);
260     /* Go */
261     MUTEX_UNLOCK(&thr->mutex);
262 #endif
263     if (err) {
264         /* Thread creation failed--clean up */
265         SvREFCNT_dec(cvcache);
266         remove_thread(thr);
267         MUTEX_DESTROY(&thr->mutex);
268         for (i = 0; i <= AvFILL(initargs); i++)
269             SvREFCNT_dec(*av_fetch(initargs, i, FALSE));
270         SvREFCNT_dec(startsv);
271         return NULL;
272     }
273 #ifdef THREAD_POST_CREATE
274     THREAD_POST_CREATE(thr);
275 #else
276     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
277         croak("panic: sigprocmask");
278 #endif
279     sv = newSViv(thr->tid);
280     sv_magic(sv, oursv, '~', 0, 0);
281     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
282     return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
283 }
284
285 static Signal_t
286 handle_thread_signal(sig)
287 int sig;
288 {
289     char c = (char) sig;
290     write(sig_pipe[0], &c, 1);
291 }
292
293 MODULE = Thread         PACKAGE = Thread
294
295 void
296 new(class, startsv, ...)
297         char *          class
298         SV *            startsv
299         AV *            av = av_make(items - 2, &ST(2));
300     PPCODE:
301         XPUSHs(sv_2mortal(newthread(startsv, av, class)));
302
303 void
304 join(t)
305         Thread  t
306         AV *    av = NO_INIT
307         int     i = NO_INIT
308     PPCODE:
309         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
310                               thr, t, ThrSTATE(t)););
311         MUTEX_LOCK(&t->mutex);
312         switch (ThrSTATE(t)) {
313         case THRf_R_JOINABLE:
314         case THRf_R_JOINED:
315             ThrSETSTATE(t, THRf_R_JOINED);
316             MUTEX_UNLOCK(&t->mutex);
317             break;
318         case THRf_ZOMBIE:
319             ThrSETSTATE(t, THRf_DEAD);
320             MUTEX_UNLOCK(&t->mutex);
321             remove_thread(t);
322             break;
323         default:
324             MUTEX_UNLOCK(&t->mutex);
325             croak("can't join with thread");
326             /* NOTREACHED */
327         }
328         JOIN(t, &av);
329
330         /* Could easily speed up the following if necessary */
331         for (i = 0; i <= AvFILL(av); i++)
332             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
333
334 void
335 detach(t)
336         Thread  t
337     CODE:
338         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
339                               thr, t, ThrSTATE(t)););
340         MUTEX_LOCK(&t->mutex);
341         switch (ThrSTATE(t)) {
342         case THRf_R_JOINABLE:
343             ThrSETSTATE(t, THRf_R_DETACHED);
344             /* fall through */
345         case THRf_R_DETACHED:
346             DETACH(t);
347             MUTEX_UNLOCK(&t->mutex);
348             break;
349         case THRf_ZOMBIE:
350             ThrSETSTATE(t, THRf_DEAD);
351             DETACH(t);
352             MUTEX_UNLOCK(&t->mutex);
353             remove_thread(t);
354             break;
355         default:
356             MUTEX_UNLOCK(&t->mutex);
357             croak("can't detach thread");
358             /* NOTREACHED */
359         }
360
361 void
362 equal(t1, t2)
363         Thread  t1
364         Thread  t2
365     PPCODE:
366         PUSHs((t1 == t2) ? &sv_yes : &sv_no);
367
368 void
369 flags(t)
370         Thread  t
371     PPCODE:
372         PUSHs(sv_2mortal(newSViv(t->flags)));
373
374 void
375 self(class)
376         char *  class
377     PREINIT:
378         SV *sv;
379     PPCODE:
380         sv = newSViv(thr->tid);
381         sv_magic(sv, oursv, '~', 0, 0);
382         SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
383         PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
384
385 U32
386 tid(t)
387         Thread  t
388     CODE:
389         MUTEX_LOCK(&t->mutex);
390         RETVAL = t->tid;
391         MUTEX_UNLOCK(&t->mutex);
392     OUTPUT:
393         RETVAL
394
395 void
396 DESTROY(t)
397         SV *    t
398     PPCODE:
399         PUSHs(&sv_yes);
400
401 void
402 yield()
403     CODE:
404         YIELD;
405
406 void
407 cond_wait(sv)
408         SV *    sv
409         MAGIC * mg = NO_INIT
410 CODE:
411         if (SvROK(sv))
412             sv = SvRV(sv);
413
414         mg = condpair_magic(sv);
415         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
416         MUTEX_LOCK(MgMUTEXP(mg));
417         if (MgOWNER(mg) != thr) {
418             MUTEX_UNLOCK(MgMUTEXP(mg));
419             croak("cond_wait for lock that we don't own\n");
420         }
421         MgOWNER(mg) = 0;
422         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
423         while (MgOWNER(mg))
424             COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
425         MgOWNER(mg) = thr;
426         MUTEX_UNLOCK(MgMUTEXP(mg));
427         
428 void
429 cond_signal(sv)
430         SV *    sv
431         MAGIC * mg = NO_INIT
432 CODE:
433         if (SvROK(sv))
434             sv = SvRV(sv);
435
436         mg = condpair_magic(sv);
437         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
438         MUTEX_LOCK(MgMUTEXP(mg));
439         if (MgOWNER(mg) != thr) {
440             MUTEX_UNLOCK(MgMUTEXP(mg));
441             croak("cond_signal for lock that we don't own\n");
442         }
443         COND_SIGNAL(MgCONDP(mg));
444         MUTEX_UNLOCK(MgMUTEXP(mg));
445
446 void
447 cond_broadcast(sv)
448         SV *    sv
449         MAGIC * mg = NO_INIT
450 CODE:
451         if (SvROK(sv))
452             sv = SvRV(sv);
453
454         mg = condpair_magic(sv);
455         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
456                               thr, sv));
457         MUTEX_LOCK(MgMUTEXP(mg));
458         if (MgOWNER(mg) != thr) {
459             MUTEX_UNLOCK(MgMUTEXP(mg));
460             croak("cond_broadcast for lock that we don't own\n");
461         }
462         COND_BROADCAST(MgCONDP(mg));
463         MUTEX_UNLOCK(MgMUTEXP(mg));
464
465 void
466 list(class)
467         char *  class
468     PREINIT:
469         Thread  t;
470         AV *    av;
471         SV **   svp;
472         int     n = 0;
473     PPCODE:
474         av = newAV();
475         /*
476          * Iterate until we have enough dynamic storage for all threads.
477          * We mustn't do any allocation while holding threads_mutex though.
478          */
479         MUTEX_LOCK(&threads_mutex);
480         do {
481             n = nthreads;
482             MUTEX_UNLOCK(&threads_mutex);
483             if (AvFILL(av) < n - 1) {
484                 int i = AvFILL(av);
485                 for (i = AvFILL(av); i < n - 1; i++) {
486                     SV *sv = newSViv(0);        /* fill in tid later */
487                     sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
488                     av_push(av, sv_bless(newRV_noinc(sv),
489                                          gv_stashpv(class, TRUE)));
490         
491                 }
492             }
493             MUTEX_LOCK(&threads_mutex);
494         } while (n < nthreads);
495         n = nthreads;   /* Get the final correct value */
496
497         /*
498          * At this point, there's enough room to fill in av.
499          * Note that we are holding threads_mutex so the list
500          * won't change out from under us but all the remaining
501          * processing is "fast" (no blocking, malloc etc.)
502          */
503         t = thr;
504         svp = AvARRAY(av);
505         do {
506             SV *sv = (SV*)SvRV(*svp);
507             sv_setiv(sv, t->tid);
508             SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
509             SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
510             SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
511             t = t->next;
512             svp++;
513         } while (t != thr);
514         /*  */
515         MUTEX_UNLOCK(&threads_mutex);
516         /* Truncate any unneeded slots in av */
517         av_fill(av, n - 1);
518         /* Finally, push all the new objects onto the stack and drop av */
519         EXTEND(sp, n);
520         for (svp = AvARRAY(av); n > 0; n--, svp++)
521             PUSHs(*svp);
522         (void)sv_2mortal((SV*)av);
523
524
525 MODULE = Thread         PACKAGE = Thread::Signal
526
527 void
528 kill_sighandler_thread()
529     PPCODE:
530         write(sig_pipe[0], "\0", 1);
531         PUSHs(&sv_yes);
532
533 void
534 init_thread_signals()
535     PPCODE:
536         sighandlerp = handle_thread_signal;
537         if (pipe(sig_pipe) == -1)
538             XSRETURN_UNDEF;
539         PUSHs(&sv_yes);
540
541 SV *
542 await_signal()
543     PREINIT:
544         char c;
545         SSize_t ret;
546     CODE:
547         do {
548             ret = read(sig_pipe[1], &c, 1);
549         } while (ret == -1 && errno == EINTR);
550         if (ret == -1)
551             croak("panic: await_signal");
552         if (ret == 0)
553             XSRETURN_UNDEF;
554         RETVAL = c ? psig_ptr[c] : &sv_no;
555     OUTPUT:
556         RETVAL