[PEAK] Reactor-driven microthreads
Phillip J. Eby
pje at telecommunity.com
Wed Dec 31 15:34:03 EST 2003
At 01:12 PM 12/31/03 -0500, Phillip J. Eby wrote:
>For clarity, I've omitted the part where the generator in readlines()
>accumulates the data, pushes back partial lines, etc. As you can see, the
>only new concept we need is a "queue", or perhaps we should call it a
>"pipe". Actually, the concept is probably used often enough that it would
>be easier to do:
>
> def readlines(self):
> def genLines():
> # ... Loop to accumulate a line
> queue.put(line)
> return Queue(self, run=genLines())
Or, even better, from an API perspective, we could define a 'queued'
wrapper as follows:
from peak.util.advice import advice
class queued(advice):
__slots__ = ()
def __call__(__advice, self, *__args, **__kw):
queue = Queue(self)
Thread(self).run(
__advice._func(self, queue, *__args, **__kw)
)
return queue
and then do:
def readlines(self,queue):
# ... Loop to accumulate a line, yielding as needed
queue.put(line)
readlines = queued(readlines)
The idea here is that 'queued' generator methods of components now do the
right thing, running in a new "thread" and receiving an extra argument for
the queue they communicate their results through. The 'queue' argument
isn't seen or passed by the caller, any more than one normally passes
'self'. Admittedly, this convention could be confusing. An alternative
would be to pass the queue *first*, so that queued methods are defined like
this:
def readlines(queue, self, ...):
# ...
This might provide a visible hint that something different is happening
here. The advice wrapper could also be extended to check that the wrapped
function defined 'self' as the second parameter at wrapping time.
Thoughts, anyone?
Actually, while on the subject of thoughts... any ideas what to *call*
this? I'd like to have it be convenient to say 'foo.Thread', 'foo.Queue',
etc. in the same way that we have binding, naming, running, logs,
etc. (Although in a4 'logs' will probably go away as an "API" framework,
because it no longer exposes even constants as an API.).
Anyway, I don't want to call it 'thread', 'threads', or 'threading', to
avoid confusion with similar Python modules or with "real"
threads. 'pthreads' (for pseudothreads) is out due to likely confusion
with the popular C-level 'pthread' package. I had been thinking 'mthread'
(for microthreads), but threads aren't even really all that central of a
concept here. This is almost like "coroutines" or "co-operative
multitasking" or "event-driven progamming".
So, maybe it could be 'coop.Thread', 'coop.Queue', etc., except those look
to me more like "coop" (as in chicken coop) than like co-op. 'multi' seems
too generic. Remaining candidates:
task
tasks
tasking
events
These are all pretty generic, though, and perhaps imply greater generality
than intended. 'events.Queue', 'events.Thread', and 'events.Sleep' all
read very nicely, although again they seem to give the package greater
scope than originally intended. That is, at some point PEAK is likely to
have various other kinds of "events" besides these. OTOH, one of the main
candidates for other kinds of events is GUI support, and this kind of
"threading" is just as useful/relevant in such an environment. Indeed,
anything that needs to respond to a sequence of "events" is likely to be
more easily expressed as a thread.
Any objections to adding 'peak.events' as a framework API? That is, 'from
peak.api import *' would include 'events', but 'from peak.core import *'
would not. The initial API it provides would probably include:
events.IThread
events.IEventSource (formerly IThreadScheduler)
events.Thread (implements IThread)
Event sources (implementing IEventSource):
events.Sleep(secs) (triggers after seconds)
events.Pause (==Sleep(0))
events.AnyOf(*args) (triggers when any of its args occur)
events.Sequence(*args) (triggers after all of its args occur, in
sequence)
events.Readable(fileno) (triggers when file is readable)
events.Writable(fileno) (triggers when file is writable)
events.Queue (triggers when queue contains data)
events.Value (triggers when value is set/changed)
events.Thread (triggers when thread is finished running)
(and later, we might add GUI-based event sources as well)
resume() (I'm actually tempted to make this a primitive or at least export
it from peak.api)
A lot of the work I've done lately on processes, timers, and loggers is
quite related; I expect that I could refactor many of their existing
"listener" mechanisms to use queues and event sources. Thus, measuring a
duration or invoking a log can be an event source that prompts threads to
be resumed. (Open issue: how to control/tell whether a given thread should
be resumed in the current or following reactor cycle, without adding an
explicit Pause.)
(You know, if Python could only pickle generator-iterators, we could
actually implement persistent workflows with this thing... Hm. Maybe with
a little help from C...)
It's almost sad how much work I put into the process supervisor system; so
many of its most convoluted aspects would be more cleanly represented as
threads. Heck, the entire peak.running.daemons scheduling system might be
better represented as threads that put themselves in the task queue after
sleeping between iterations. The task queue would be monitored by a thread
that then resumed whichever waiting thread had the highest priority. And
the whole thing would be a lot clearer, although the lack of try-finally
might be occasionally inconvenient.
*Very* interesting, indeed.
More information about the PEAK
mailing list