In this quite old paper, it was shown how to extend SNAKES to model and execute time Petri nets (Merlin & Farber's version, that with clocks on transitions). The code originally written in 2008 is broken because of slight changes in SNAKES. This post is both an update and a more complete explanation of the implementation. You can grab the full code here.
Implementing the plugin
First we load all the necessary and define function extend
as
expected for plugins (the decorator is the first place where 2008 code
is broken). All the rest of the code below is thus nested inside
function extend
.
### start of tpn.py ###
from snakes import ConstraintError
import snakes.plugins
@snakes.plugins.plugin("snakes.nets")
def extend (module) :
Then, we extend class Transition
by adding a min and max firing
times. We use None
for max firing time to model an infinite value.
We also use an attribute time
to store the current clock of each
transition; it's set to None
when the transition is disabled because
of the marking. Method enabled
is redefined to take time into
account, it is given an optional additional argument untimed
that
allows to check enabling without taking time into account. Appart from
this detail, extension is straightforward.
class Transition (module.Transition) :
def __init__ (self, name, guard=None, **args) :
self.time = None
self.min_time = args.pop("min_time", 0.0)
self.max_time = args.pop("max_time", None)
module.Transition.__init__(self, name, guard, **args)
def enabled (self, binding, **args) :
if args.pop("untimed", False) :
return module.Transition.enabled(self, binding)
elif self.time is None :
return False
elif self.max_time is None :
return (self.min_time <= self.time) and module.Transition.enabled(self, binding)
else :
return (self.min_time <= self.time <= self.max_time) and module.Transition.enabled(self, binding)
Then, we extend places. The constructor adds two attributes that are
normally added by PetriNet.add_place
but are used sooner in our case
to set the marking (this is the other place where 2008 code is
broken). Several methods are redefined to handle the clocks of the
transitions in the post-set of each place. In particular, see how
reset
works: it calls the original method to handle tokens-related
aspects; then it goes through the transitions in the post-set and
checks for each if it has at least one mode; if so, its clock is reset
to 0.0
otherwise it is set to None
. Method empty
is even simpler
because all the transitions of the post-set have to be disabled. Then,
methods add
and remove
are variants and use an auxiliary method
_post_enabled
that returns a dict
to indicate for each
post-transition if it's currently enabled or not, so that there clocks
are updated selectively (some may be newly enabled in add
, some may
be newly disabled in remove
).
class Place (module.Place) :
def __init__ (self, name, tokens=[], check=None, **args) :
self.post = {}
self.pre = {}
module.Place.__init__(self, name, tokens, check, **args)
def reset (self, tokens) :
module.Place.reset(self, tokens)
for name in self.post :
trans = self.net.transition(name)
if len(trans.modes()) > 0 :
trans.time = 0.0
else :
trans.time = None
def empty (self) :
module.Place.empty(self)
for name in self.post :
self.net.transition(name).time = None
def _post_enabled (self) :
return dict((name, self.net.transition(name).time is not None)
for name in self.post)
def add (self, tokens) :
enabled = self._post_enabled()
module.Place.add(self, tokens)
for name in self.post :
if not enabled[name] :
trans = self.net.transition(name)
if len(trans.modes()) > 0 :
trans.time = 0.0
def remove (self, tokens) :
enabled = self._post_enabled()
module.Place.remove(self, tokens)
for name in self.post :
if enabled[name] :
trans = self.net.transition(name)
if len(trans.modes()) == 0 :
trans.time = None
Code for add
is to be checked carefully against the precise
semantics of time Petri nets one wants to use. Indeed, we have
considered here that adding tokens does not change the clocks, but
since it may bring new modes, it could be considered that the clock
has to be reset in this case. Furthermore, we have considered only one
clock per transition, which is correct for places/transitions Petri
nets (ie, uncoloured). But one could want to consider one distinct
clock for each mode, which would require a complete rewrite of this
plugin (but I bet this would be quite straightforward).
Finally, we extend Petri nets themselves with three new methods.
First, reset
resets all the transitions clocks, which is achieved
indirectly by resetting the marking of each place. Then, step
computes the largest time step that the net is currently allowed to
perform without crossing any time constraint (see executions below).
Last, time
performs a time step of a given amount (or that returned
by step
if this amount is omitted). Implementation is
straightforward, just note in time
that min(None, ...)
would
return None
in the case that self.step()
would return None
.
class PetriNet (module.PetriNet) :
def reset (self) :
self.set_marking(self.get_marking())
def step (self) :
step = None
for trans in self.transition() :
if trans.time is None :
continue
if trans.time < trans.min_time :
if step is None :
step = trans.min_time - trans.time
else :
step = min(step, trans.min_time - trans.time)
elif trans.max_time is None :
pass
elif trans.time <= trans.max_time :
if step is None :
step = trans.max_time - trans.time
else :
step = min(step, trans.max_time - trans.time)
return step
def time (self, step=None) :
if step is None :
step = self.step()
else :
step = min(self.step(), step)
if step is None :
return None
for trans in self.transition() :
if trans.time is not None :
trans.time += step
return step
The last step, small but crucial, is to have function extend
return
all the classes it has extended.
return Transition, Place, PetriNet
### end of tpn.py ###
Using the plugin for time Petri nets executions
Let's see how to use this and how time is handled during executions.
Basically, an execution can alternate calls to PetriNet.time()
with
transitions firings. But we'll see two examples to clarify how time
steps are computed.
First we import the plugin as a module, and plug it onto
snakes.nets
. Note that we cannot use snakes.plugins.load("tpn",
"snakes.nets", "snk")
because this would search module tpn
within
the standard SNAKES installation (as snakes.plugins.tpn
).
>>> import tpn
>>> import snakes.plugins
>>> snakes.plugins.load(tpn, "snakes.nets", "snk")
<module 'snk' from 'snakes/nets.pyc'>
>>> from snk import *
Then we build a simple Petri net with three transitions, each consuming a token in its own place, and we choose growing min/max firing times.
>>> n = PetriNet("stepper")
>>> for i in range(3) :
... n.add_place(Place("p%s" % i, [dot]))
... n.add_transition(Transition("t%s" % i, min_time=i+1, max_time=i*2+1))
... n.add_input("p%s" % i, "t%s" % i, Value(dot))
>>> init = n.get_marking()
The first execution just alternates time steps and transitions
firings. Note how n.time()
selects a delay that just enables the
next transition. This is because step
computes the maximal time
delay until a min or max firing time is reached.
>>> n.reset()
>>> clock = 0.0
>>> for i in range(3) :
... print " , ".join("%s[%s,%s]=%s"
... % (t, t.min_time, t.max_time,
... "#" if t.time is None else t.time)
... for t in sorted(n.transition()))
... delay = n.time()
... print "[%s]" % clock, "delay:", delay
... clock += delay
... print "[%s] fire: t%s" % (clock, i)
... n.transition("t%s" % i).fire(Substitution())
t0[1,1]=0.0 , t1[2,3]=0.0 , t2[3,5]=0.0
[0.0] delay: 1.0
[1.0] fire: t0
t0[1,1]=# , t1[2,3]=1.0 , t2[3,5]=1.0
[1.0] delay: 1.0
[2.0] fire: t1
t0[1,1]=# , t1[2,3]=# , t2[3,5]=2.0
[2.0] delay: 1.0
[3.0] fire: t2
Now, if we perform two time steps in a row instead of just one, we can
observe a difference. (Note also that while n.set_marking(init)
resets the net to its initial marking, it also resets all the clocks
because each transition has a place in its pre-set whose marking is
reset, just like for method reset
.)
>>> n.set_marking(init)
>>> clock = 0.0
>>> for i in range(3) :
... print " , ".join("%s[%s,%s]=%s" % (t, t.min_time, t.max_time,
... "#" if t.time is None else t.time)
... for t in sorted(n.transition()))
... for j in range(2) :
... delay = n.time()
... print "[%s]" % clock, "delay:", delay
... clock += delay
... print "[%s] fire: t%s" % (clock, i)
... n.transition("t%s" % i).fire(Substitution())
t0[1,1]=0.0 , t1[2,3]=0.0 , t2[3,5]=0.0
[0.0] delay: 1.0
[1.0] delay: 0.0
[1.0] fire: t0
t0[1,1]=# , t1[2,3]=1.0 , t2[3,5]=1.0
[1.0] delay: 1.0
[2.0] delay: 1.0
[3.0] fire: t1
t0[1,1]=# , t1[2,3]=# , t2[3,5]=3.0
[3.0] delay: 2.0
[5.0] delay: 0.0
[5.0] fire: t2
Let's consider each iteration of the external for loop in turn.
- The first call to
time()
is just like previously. But the next call performs no time step because this would overcome the max firing time fort1
. - After firing
t1
a call totime()
makes time progress until the minimum firing time oft2
is reached. But then, another call performs another time step until the max firing time oft2
is reached (and so subsequent calls would perform no time steps). This is howstep
works: it allows to reach the next min or max firing time, but if we had already reached a min time before the call, then it will be overcame if this is allowed. - No surprise for
t3
, this is just like fort1
except that the first step is larger.
So, a careful control of executions would not simply alternate calls
to time()
with transitions firings. It must take into account that
several transitions may be ready to fire at any point, and that not
all the transitions should necessarily fire as early as possible.