This is a live mirror of the Perl 5 development currently hosted at https://github.com/perl/perl5
Initial check-in of Thread module.
[perl5.git] / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 typedef struct condpair {
6     pthread_mutex_t     mutex;
7     pthread_cond_t      cond;
8     Thread              owner;
9 } condpair_t;
10
11 AV *condpair_table;
12 typedef SSize_t Thread__Cond;
13
14 static void *
15 threadstart(arg)
16 void *arg;
17 {
18     Thread thr = (Thread) arg;
19     LOGOP myop;
20     dSP;
21     I32 oldmark = TOPMARK;
22     I32 oldscope = scopestack_ix;
23     I32 retval;
24     AV *returnav = newAV();
25     int i;
26     
27     /*
28      * Wait until our creator releases us. If we didn't do this, then
29      * it would be potentially possible for out thread to carry on and
30      * do stuff before our creator fills in our "self" field. For example,
31      * if we went and created another thread which tried to pthread_join
32      * with us, then we'd be in a mess.
33      */
34     MUTEX_LOCK(threadstart_mutexp);
35     MUTEX_UNLOCK(threadstart_mutexp);
36     MUTEX_DESTROY(threadstart_mutexp);  /* don't need it any more */
37     Safefree(threadstart_mutexp);
38
39     DEBUG_L(fprintf(stderr, "new thread 0x%lx starting at %s\n",
40                     (unsigned long) thr, SvPEEK(TOPs)));
41     /*
42      * It's safe to wait until now to set the thread-specific pointer
43      * from our pthread_t structure to our struct thread, since we're
44      * the only thread who can get at it anyway.
45      */
46     if (pthread_setspecific(thr_key, (void *) thr))
47         croak("panic: pthread_setspecific");
48
49     switch (Sigsetjmp(top_env,1)) {
50       case 3:
51         fprintf(stderr, "panic: top_env\n");
52         /* fall through */
53       case 1:
54 #ifdef VMS
55         statusvalue = 255;
56 #else
57         statusvalue = 1;
58 #endif
59         /* fall through */
60       case 2:
61         av_store(returnav, 0, newSViv(statusvalue));
62         goto finishoff;
63     }
64
65     /* Now duplicate most of perl_call_sv but with a few twists */
66     op = (OP*)&myop;
67     Zero(op, 1, LOGOP);
68     myop.op_flags = OPf_STACKED;
69     myop.op_next = Nullop;
70     myop.op_flags |= OPf_KNOW;
71     myop.op_flags |= OPf_LIST;
72     op = pp_entersub(ARGS);
73     if (op)
74         runops();
75     retval = stack_sp - (stack_base + oldmark);
76     sp -= retval;
77     av_store(returnav, 0, newSVpv("", 0));
78     for (i = 1; i <= retval; i++)
79         sv_setsv(*av_fetch(returnav, i, TRUE), *sp++);
80
81   finishoff:
82     SvREFCNT_dec(stack);
83     SvREFCNT_dec(cvcache);
84     Safefree(markstack);
85     Safefree(scopestack);
86     Safefree(savestack);
87     Safefree(retstack);
88     Safefree(cxstack);
89     Safefree(tmps_stack);
90
91     return (void *) returnav;   /* Available for anyone to join with us */
92 }
93
94 Thread
95 newthread(startsv, initargs)
96 SV *startsv;
97 AV *initargs;
98 {
99     dTHR;
100     dSP;
101     Thread savethread;
102     int i;
103     
104     savethread = thr;
105     New(53, thr, 1, struct thread);
106     init_stacks(ARGS);
107     SPAGAIN;
108     defstash = savethread->Tdefstash;   /* XXX maybe these should */
109     curstash = savethread->Tcurstash;   /* always be set to main? */
110     mainstack = stack;
111     /* top_env? */
112     /* runlevel */
113     cvcache = newHV();
114
115     /* The following pushes the arg list and startsv onto the *new* stack */
116     PUSHMARK(sp);
117     /* Could easily speed up the following greatly */
118     for (i = 0; i < AvFILL(initargs); i++)
119         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
120     XPUSHs(SvREFCNT_inc(startsv));
121     PUTBACK;
122
123     New(53, threadstart_mutexp, 1, pthread_mutex_t);
124     /* On your marks... */
125     MUTEX_INIT(threadstart_mutexp);
126     MUTEX_LOCK(threadstart_mutexp);
127     /* Get set...
128      * Increment the global thread count. It is decremented
129      * by the destructor for the thread specific key thr_key.
130      */
131     MUTEX_LOCK(&nthreads_mutex);
132     nthreads++;
133     MUTEX_UNLOCK(&nthreads_mutex);
134     if (pthread_create(&self, NULL, threadstart, (void*) thr))
135         return NULL;    /* XXX should clean up first */
136     /* Go */
137     MUTEX_UNLOCK(threadstart_mutexp);
138     return thr;
139 }
140
141 void condpair_kick(SSize_t cond, SV *code, int broadcast_flag) {
142     condpair_t *condp;
143     HV *hvp;
144     GV *gvp;
145     CV *cv = sv_2cv(code, &hvp, &gvp, FALSE); 
146     SV *sv = *av_fetch(condpair_table, cond, TRUE);
147     dTHR;       
148
149     if (!SvOK(sv))
150         croak("bad Cond object argument");
151     condp = (condpair_t *) SvPVX(sv);
152     /* Get ownership of condpair object */
153     MUTEX_LOCK(&condp->mutex);
154     while (condp->owner && condp->owner != thr)
155         COND_WAIT(&condp->cond, &condp->mutex);
156     if (condp->owner == thr) {
157         MUTEX_UNLOCK(&condp->mutex);
158         croak("Recursing in Thread::Cond::waituntil");
159     }
160     condp->owner = thr;
161     MUTEX_UNLOCK(&condp->mutex);
162     /* We now own the condpair object */
163     perl_call_sv(code, G_SCALAR|G_NOARGS|G_DISCARD|G_EVAL);
164     /* Release condpair object */
165     MUTEX_LOCK(&condp->mutex);
166     condp->owner = 0;
167     /* Signal or Broadcast condpair */
168     if (broadcast_flag)
169         COND_BROADCAST(&condp->cond);
170     else
171         COND_SIGNAL(&condp->cond);
172     MUTEX_UNLOCK(&condp->mutex);
173     /* Check we don't need to propagate a die */
174     sv = GvSV(gv_fetchpv("@", TRUE, SVt_PV));
175     if (SvTRUE(sv))
176         croak(SvPV(sv, na));
177 }    
178
179 static SV *
180 fast(sv)
181 SV *sv;
182 {
183     HV *hvp;
184     GV *gvp;
185     CV *cv = sv_2cv(sv, &hvp, &gvp, FALSE);
186
187     if (!cv)
188         croak("Not a CODE reference");
189     if (CvCONDP(cv)) {
190         COND_DESTROY(CvCONDP(cv));
191         Safefree(CvCONDP(cv));
192         CvCONDP(cv) = 0;
193     }
194     return sv;
195 }
196
197 MODULE = Thread         PACKAGE = Thread
198
199 Thread
200 new(class, startsv, ...)
201         SV *            class
202         SV *            startsv
203         AV *            av = av_make(items - 1, &ST(2));
204     CODE:
205         RETVAL = newthread(startsv, av);
206     OUTPUT:
207         RETVAL
208
209 void
210 sync(sv)
211         SV *    sv
212         HV *    hvp = NO_INIT
213         GV *    gvp = NO_INIT
214     CODE:
215         SvFLAGS(sv_2cv(sv, &hvp, &gvp, FALSE)) |= SVpcv_SYNC;
216         ST(0) = sv_mortalcopy(sv);
217
218 void
219 fast(sv)
220         SV *    sv
221     CODE:
222         ST(0) = sv_mortalcopy(fast(sv));
223
224 void
225 join(t)
226         Thread  t
227         AV *    av = NO_INIT
228         int     i = NO_INIT
229     PPCODE:
230         if (pthread_join(t->Tself, (void **) &av))
231             croak("pthread_join failed");
232         /* Could easily speed up the following if necessary */
233         for (i = 0; i <= AvFILL(av); i++)
234             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
235
236 void
237 yield(t)
238         Thread  t
239     CODE:
240         pthread_yield();
241
242 MODULE = Thread         PACKAGE = Thread::Cond
243
244 Thread::Cond
245 new(class)
246         char *          class
247         SV *            sv = NO_INIT
248         condpair_t *    condp = NO_INIT
249     CODE:
250         if (!condpair_table)
251             condpair_table = newAV();
252         sv = newSVpv("", 0);
253         sv_grow(sv, sizeof(condpair_t));
254         condp = (condpair_t *) SvPVX(sv);
255         MUTEX_INIT(&condp->mutex);
256         COND_INIT(&condp->cond);
257         condp->owner = 0;
258         av_push(condpair_table, sv);
259         RETVAL = AvFILL(condpair_table);
260     OUTPUT:
261         RETVAL
262
263 void
264 waituntil(cond, code)
265         Thread::Cond    cond
266         SV *            code
267         SV *            sv = NO_INIT
268         condpair_t *    condp = NO_INIT
269         HV *            hvp = NO_INIT
270         GV *            gvp = NO_INIT
271         CV *            cv = sv_2cv(code, &hvp, &gvp, FALSE); 
272         I32             count = NO_INIT
273     CODE:
274         sv = *av_fetch(condpair_table, cond, TRUE);
275         if (!SvOK(sv))
276             croak("bad Cond object argument");
277         condp = (condpair_t *) SvPVX(sv);
278         do {
279             /* Get ownership of condpair object */
280             MUTEX_LOCK(&condp->mutex);
281             while (condp->owner && condp->owner != thr)
282                 COND_WAIT(&condp->cond, &condp->mutex);
283             if (condp->owner == thr) {
284                 MUTEX_UNLOCK(&condp->mutex);
285                 croak("Recursing in Thread::Cond::waituntil");
286             }
287             condp->owner = thr;
288             MUTEX_UNLOCK(&condp->mutex);
289             /* We now own the condpair object */
290             count = perl_call_sv(code, G_SCALAR|G_NOARGS|G_EVAL);
291             SPAGAIN;
292             /* Release condpair object */
293             MUTEX_LOCK(&condp->mutex);
294             condp->owner = 0;
295             MUTEX_UNLOCK(&condp->mutex);
296             /* See if we need to go round again */      
297             if (count == 0)
298                 croak(SvPV(GvSV(gv_fetchpv("@", TRUE, SVt_PV)), na));
299             else if (count > 1)
300                 croak("waituntil code returned more than one value");
301             sv = POPs;
302             PUTBACK;
303         } while (!SvTRUE(sv));
304         ST(0) = sv_mortalcopy(sv);
305
306 void
307 signal(cond, code)
308         Thread::Cond    cond
309         SV *            code
310     CODE:
311         condpair_kick(cond, code, 0);
312
313 void
314 broadcast(cond, code)
315         Thread::Cond    cond
316         SV *            code
317     CODE:
318         condpair_kick(cond, code, 1);
319