In this quite old paper, it was shown how to extend SNAKES to model and execute time Petri nets (Merlin & Farber's version, that with clocks on transitions). The code originally written in 2008 is broken because of slight changes in SNAKES. This post is both an update and a more complete explanation of the implementation. You can grab the full code here.

Implementing the plugin

First we load all the necessary and define function extend as expected for plugins (the decorator is the first place where 2008 code is broken). All the rest of the code below is thus nested inside function extend.

### start of tpn.py ###
from snakes import ConstraintError
import snakes.plugins

@snakes.plugins.plugin("snakes.nets")
def extend (module) :

Then, we extend class Transition by adding a min and max firing times. We use None for max firing time to model an infinite value. We also use an attribute time to store the current clock of each transition; it's set to None when the transition is disabled because of the marking. Method enabled is redefined to take time into account, it is given an optional additional argument untimed that allows to check enabling without taking time into account. Appart from this detail, extension is straightforward.

    class Transition (module.Transition) :
        def __init__ (self, name, guard=None, **args) :
            self.time = None
            self.min_time = args.pop("min_time", 0.0)
            self.max_time = args.pop("max_time", None)
            module.Transition.__init__(self, name, guard, **args)
        def enabled (self, binding, **args) :
            if args.pop("untimed", False) :
                return module.Transition.enabled(self, binding)
            elif self.time is None :
                return False
            elif self.max_time is None :
                return (self.min_time <= self.time) and module.Transition.enabled(self, binding)
            else :
                return (self.min_time <= self.time <= self.max_time) and module.Transition.enabled(self, binding)

Then, we extend places. The constructor adds two attributes that are normally added by PetriNet.add_place but are used sooner in our case to set the marking (this is the other place where 2008 code is broken). Several methods are redefined to handle the clocks of the transitions in the post-set of each place. In particular, see how reset works: it calls the original method to handle tokens-related aspects; then it goes through the transitions in the post-set and checks for each if it has at least one mode; if so, its clock is reset to 0.0 otherwise it is set to None. Method empty is even simpler because all the transitions of the post-set have to be disabled. Then, methods add and remove are variants and use an auxiliary method _post_enabled that returns a dict to indicate for each post-transition if it's currently enabled or not, so that there clocks are updated selectively (some may be newly enabled in add, some may be newly disabled in remove).

    class Place (module.Place) :
        def __init__ (self, name, tokens=[], check=None, **args) :
            self.post = {}
            self.pre = {}
            module.Place.__init__(self, name, tokens, check, **args)
        def reset (self, tokens) :
            module.Place.reset(self, tokens)
            for name in self.post :
                trans = self.net.transition(name)
                if len(trans.modes()) > 0 :
                    trans.time = 0.0
                else :
                    trans.time = None
        def empty (self) :
            module.Place.empty(self)
            for name in self.post :
                self.net.transition(name).time = None
        def _post_enabled (self) :
            return dict((name, self.net.transition(name).time is not None)
                        for name in self.post)
        def add (self, tokens) :
            enabled = self._post_enabled()
            module.Place.add(self, tokens)
            for name in self.post :
                if not enabled[name] :
                    trans = self.net.transition(name)
                    if len(trans.modes()) > 0 :
                        trans.time = 0.0
        def remove (self, tokens) :
            enabled = self._post_enabled()
            module.Place.remove(self, tokens)
            for name in self.post :
                if enabled[name] :
                    trans = self.net.transition(name)
                    if len(trans.modes()) == 0 :
                        trans.time = None

Code for add is to be checked carefully against the precise semantics of time Petri nets one wants to use. Indeed, we have considered here that adding tokens does not change the clocks, but since it may bring new modes, it could be considered that the clock has to be reset in this case. Furthermore, we have considered only one clock per transition, which is correct for places/transitions Petri nets (ie, uncoloured). But one could want to consider one distinct clock for each mode, which would require a complete rewrite of this plugin (but I bet this would be quite straightforward).

Finally, we extend Petri nets themselves with three new methods. First, reset resets all the transitions clocks, which is achieved indirectly by resetting the marking of each place. Then, step computes the largest time step that the net is currently allowed to perform without crossing any time constraint (see executions below). Last, time performs a time step of a given amount (or that returned by step if this amount is omitted). Implementation is straightforward, just note in time that min(None, ...) would return None in the case that self.step() would return None.

    class PetriNet (module.PetriNet) :
        def reset (self) :
            self.set_marking(self.get_marking())
        def step (self) :
            step = None
            for trans in self.transition() :
                if trans.time is None :
                    continue
                if trans.time < trans.min_time :
                    if step is None :
                        step = trans.min_time - trans.time
                    else :
                        step = min(step, trans.min_time - trans.time)
                elif trans.max_time is None :
                    pass
                elif trans.time <= trans.max_time :
                    if step is None :
                        step = trans.max_time - trans.time
                    else :
                        step = min(step, trans.max_time - trans.time)
            return step
        def time (self, step=None) :
            if step is None :
                step = self.step()
            else :
                step = min(self.step(), step)
            if step is None :
                return None
            for trans in self.transition() :
                if trans.time is not None :
                    trans.time += step
            return step

The last step, small but crucial, is to have function extend return all the classes it has extended.

    return Transition, Place, PetriNet
### end of tpn.py ###

Using the plugin for time Petri nets executions

Let's see how to use this and how time is handled during executions. Basically, an execution can alternate calls to PetriNet.time() with transitions firings. But we'll see two examples to clarify how time steps are computed.

First we import the plugin as a module, and plug it onto snakes.nets. Note that we cannot use snakes.plugins.load("tpn", "snakes.nets", "snk") because this would search module tpn within the standard SNAKES installation (as snakes.plugins.tpn).

>>> import tpn
>>> import snakes.plugins
>>> snakes.plugins.load(tpn, "snakes.nets", "snk")
<module 'snk' from 'snakes/nets.pyc'>
>>> from snk import *

Then we build a simple Petri net with three transitions, each consuming a token in its own place, and we choose growing min/max firing times.

>>> n = PetriNet("stepper")
>>> for i in range(3) :
...     n.add_place(Place("p%s" % i, [dot]))
...     n.add_transition(Transition("t%s" % i, min_time=i+1, max_time=i*2+1))
...     n.add_input("p%s" % i, "t%s" % i, Value(dot))
>>> init = n.get_marking()

The first execution just alternates time steps and transitions firings. Note how n.time() selects a delay that just enables the next transition. This is because step computes the maximal time delay until a min or max firing time is reached.

>>> n.reset()
>>> clock = 0.0
>>> for i in range(3) :
...     print " , ".join("%s[%s,%s]=%s"
...                      % (t, t.min_time, t.max_time,
...                         "#" if t.time is None else t.time)
...                      for t in sorted(n.transition()))
...     delay = n.time()
...     print "[%s]" % clock, "delay:", delay
...     clock += delay
...     print "[%s] fire: t%s" % (clock, i)
...     n.transition("t%s" % i).fire(Substitution())
t0[1,1]=0.0 , t1[2,3]=0.0 , t2[3,5]=0.0
[0.0] delay: 1.0
[1.0] fire: t0
t0[1,1]=# , t1[2,3]=1.0 , t2[3,5]=1.0
[1.0] delay: 1.0
[2.0] fire: t1
t0[1,1]=# , t1[2,3]=# , t2[3,5]=2.0
[2.0] delay: 1.0
[3.0] fire: t2

Now, if we perform two time steps in a row instead of just one, we can observe a difference. (Note also that while n.set_marking(init) resets the net to its initial marking, it also resets all the clocks because each transition has a place in its pre-set whose marking is reset, just like for method reset.)

>>> n.set_marking(init)
>>> clock = 0.0
>>> for i in range(3) :
...     print " , ".join("%s[%s,%s]=%s" % (t, t.min_time, t.max_time,
...                                        "#" if t.time is None else t.time)
...                      for t in sorted(n.transition()))
...     for j in range(2) :
...         delay = n.time()
...         print "[%s]" % clock, "delay:", delay
...         clock += delay
...     print "[%s] fire: t%s" % (clock, i)
...     n.transition("t%s" % i).fire(Substitution())
t0[1,1]=0.0 , t1[2,3]=0.0 , t2[3,5]=0.0
[0.0] delay: 1.0
[1.0] delay: 0.0
[1.0] fire: t0
t0[1,1]=# , t1[2,3]=1.0 , t2[3,5]=1.0
[1.0] delay: 1.0
[2.0] delay: 1.0
[3.0] fire: t1
t0[1,1]=# , t1[2,3]=# , t2[3,5]=3.0
[3.0] delay: 2.0
[5.0] delay: 0.0
[5.0] fire: t2

Let's consider each iteration of the external for loop in turn.

  1. The first call to time() is just like previously. But the next call performs no time step because this would overcome the max firing time for t1.
  2. After firing t1 a call to time() makes time progress until the minimum firing time of t2 is reached. But then, another call performs another time step until the max firing time of t2 is reached (and so subsequent calls would perform no time steps). This is how step works: it allows to reach the next min or max firing time, but if we had already reached a min time before the call, then it will be overcame if this is allowed.
  3. No surprise for t3, this is just like for t1 except that the first step is larger.

So, a careful control of executions would not simply alternate calls to time() with transitions firings. It must take into account that several transitions may be ready to fire at any point, and that not all the transitions should necessarily fire as early as possible.