[PEAK] Reactor-driven microthreads
Phillip J. Eby
pje at telecommunity.com
Wed Dec 31 00:22:11 EST 2003
It seems like I've been dealing in various kinds of reactive systems a lot
lately using PEAK. Although they were easier to write than they would've
been without PEAK, I think they're not quite "easy enough" yet.
Twisted makes reactivity the center of the world, which to me is the tail
wagging the dog. Most application code -- and programmer thought -- is
based on linear and iterative patterns. It would be nice to be able to
leverage one's linear-thinking skills to write reactive code. So nice, in
fact, that it would be worth a little runtime overhead to be able to write
code faster and with fewer defects.
Regular threads are not the answer, though, as preemptive multitasking is
prone to far subtler bugs. So, a co-operative multitasking mechanism is
needed.
Python generator functions provide an ideal basis for such a
mechanism. Indeed, Twisted has its 'flow' package to do something similar
to what I have in mind. However, even 'flow' has some "twisted"
assumptions that don't really suit PEAK. Even in 'flow', the reactive
paradigm still seems quite central, and generators are used to make the
existing Twisted framework more usable in certain areas.
But, a lot of the things the Twisted framework offers, like protocols and
connectors and the like, are primarily necessary because Twisted is
event-driven in the first place! If it were based on co-operative
multitasking, much of that complexity would go away. Consider the relative
simplicity of:
def talkToAServer(self,address):
sock = self.lookupComponent(address, adaptTo=net.IClientAsyncSocket)
readline = sock.readline
try:
yield sock.untilConnected(timeout=self.connTimeout)
yield sock.send("HELO world\n")
yield sock.untilLineRead(timeout=self.lineTimeout)
data = readline()
# do something with the data
except sock.TimeoutError:
yield mthread.Error()
sock.close()
return
sock.close()
Compared to most client code I've seen for Twisted, this is pretty darn simple.
Anyway, the basic idea is that you write generators that yield the results
of calling methods that figuratively "wait" for something to happen. To
use the above code, something might call:
mthread.Thread(self.talkToAServer(address)).run()
The Thread wrapper handles iterating over the generator it's passed, and
manages scheduling. In essence a Thread calls
'reactor.callLater(0,thread.iterate)' to reschedule itself if code has
co-operatively yielded, e.g. via 'yield None'. If an 'IThreadScheduler' is
yielded, though, the Thread passes itself to the 'IThreadScheduler'
instance to request scheduling. A scheduler can do things like add a
reader to the reactor, schedule a delayed call to resume the thread, or
"push" a new generator on top of the Thread's "stack", so that the Thread
then runs the other generator. In this way, lines like 'yield
sock.send("HELO world\n")' may effectively transfer control to another
generator. That is, 'sock.send()' could be another generator loop,
something like:
def send(self, data, timeout=None):
sentBytes = self._realsocket.send(data)
while sentBytes<len(data):
yield self.untilWritable(timeout)
sentBytes += self._realsocket.send(data[sentBytes:])
'untilWritable' would return an 'IThreadScheduler' that adds a writer to
the reactor that re-enables the Thread that's executing the generator.
Under this scheme, the current process supervisor would be able to ditch a
lot of state driven code with something like:
processCount = binding.Make(mthread.Value)
desiredProcesses = binding.Make(mthread.Value)
def _ensureProcessesRunning(self):
while True:
if self.processCount()<self.desiredProcesses():
# Start a process as soon as possible
self.reactor.callLater(0, self._doStart)
# But don't do any more scheduling until start interval passes
yield mthread.sleep(self.startInterval)
else:
# We have enough processes, so reset desired = minimum
self.desiredProcesses.set(self.minChildren)
# And sleep until something relevant changes
yield self.processCount.changed() |
self.desiredProcesses.changed()
And in the process supervisor's startup process, it would do:
self.processCount.set(0)
self.desiredProcesses.set(self.minProcesses)
Thread(self._ensureProcessesRunning()).run()
Finally, in methods that added or removed child processes, we would update
the process count variable, and in methods that wanted more children
running, we would bump up 'desiredProcesses' (but no higher than maxChildren).
The simple loop above took much less time to write than the code that does
this now in the supervisor tool, and it's easier to tell if it's
correct. To implement it would require only the Thread and Watchable
classes, and a Union class (to implement the '|' operator over
IThreadSchedulers).
My main concern about this concept are that it is likely to do an awful lot
of object creation and function calls, compared to the event-driven
approach. OTOH, for threads like the last one, those objects are created
only when something relevant happens, so maybe it doesn't really do
anything more than the event-driven approach would. And, if operations
like Unions, Values, and the rest are called a lot, then it might be worth
porting them to Pyrex. For objects like sockets and file handles, we could
avoid creating new objects for each read or write by having "reader" and
"writer" IThreadScheduler objects that live on the wrapper, as long as we
were willing to have timeouts be a socket-level configuration rather than a
per-read or per-write setting.
Similarly, Value objects could have their 'changed' attribute be an
IThreadScheduler, so you'd yield the 'changed' object. Hm. So that loop
above would become:
def _ensureProcessesRunning(self):
processCount = self.processCount
desiredProcs = self.desiredProcesses
until_something_changed = processCount.changed | desiredProcs.changed
while True:
if processCount() < desiredProcs():
# Start a process as soon as possible
self.reactor.callLater(0, self._doStart)
# But don't do any more scheduling until start interval passes
yield mthread.sleep(self.startInterval)
else:
# We have enough processes, so reset desired = minimum
self.desiredProcesses.set(self.minChildren)
# And sleep until something relevant changes
yield until_something_changed
And now the only objects being created are for the callLater() and sleep()
invocations.
Exceptions are an interesting problem. A scheduler like 'sleep()' can
throw an exception in the generator where it's called. But an error in the
scheduler itself is akin to an uncaught exception in a "real" thread: it
may force termination of the thread. Alas, I don't know of a way to push
such exceptions back into the generator in the general case. For things
like read operations on a socket, you have to call something when execution
resumes, so of course there you can have an error.
It may be that instead of doing:
Thread(self._ensureProcessesRunning()).run()
one does something like:
Thread().run(self._ensureProcessesRunning, *args, **kw)
And the Thread() object passes itself to the generator. Then the generator
itself would be able to do things like 'thread.errors()' to force any
errors to be thrown in the current execution. Ugh. It's too bad there's
no way to "throw" errors "into" a generator, without having to call
something from inside the generator that holds the exception in order to
reraise it. Bleah.
I guess we'll have to do some thinking about how exceptions are handled in
Threads for this to work, and consider what errors *need* to be seen by
Thread-generators, versus what errors should terminate a Thread, or be
simply caught-and-logged by a handler. This is one of those unfortunate
coding areas where errors simply *must* be explicitly silenced. On the
plus side, this applies almost exclusively to IThreadSchedulers themselves,
not to threaded generators.
Well, it's getting awfully late, so I think I'll wait for another day to
sort out all the details of exception handling. But it sounds like the
only *really* open issue here is how schedulers' errors can be passed back
into the generators in a relatively transparent fashion. Probably, that
means that we'd end up with statements like:
yield thread(until_something_changed)
or:
yield until_something_changed(thread)
as either of these would let the thread object invoke the scheduler (or
vice versa) while still "in" the generator's execution. So any errors
would be thrown in the original context. Then, only errors occurring when
the thread is "resumed" would have to be checked for inside the generator.
Hm. This seems like the first really good use case I've seen for having a
macro facility in Python, since it would allow us to spell the
yield+errorcheck combination with less boilerplate. Ah well.
More information about the PEAK
mailing list