Skip to content

Instantly share code, notes, and snippets.

@K4L1Ma
Created December 23, 2025 11:04
Show Gist options
  • Select an option

  • Save K4L1Ma/d22762ad3ad5fda779bf302fad2b0b39 to your computer and use it in GitHub Desktop.

Select an option

Save K4L1Ma/d22762ad3ad5fda779bf302fad2b0b39 to your computer and use it in GitHub Desktop.
Colored Petri Net Implementation in Go - Restaurant Simulation Example
package main
import (
"fmt"
"reflect"
)
type Name string
type Label string
type Token[T any] struct {
id string
color reflect.Kind
data T
priority uint
}
func NewToken[T any](data T, opts ...func(*Token[T])) *Token[T] {
of := reflect.TypeOf(data)
t := &Token[T]{
id: fmt.Sprintf("token-%s", of.String()),
color: of.Kind(),
data: data,
priority: 0,
}
for _, opt := range opts {
opt(t)
}
return t
}
func WithPriority[T any](priority uint) func(*Token[T]) {
return func(t *Token[T]) {
t.priority = priority
}
}
type TokenList struct {
items []any
}
func NewTokenList(initial ...any) *TokenList {
return &TokenList{items: initial}
}
func (tl *TokenList) IsEmpty() bool {
return len(tl.items) == 0
}
func (tl *TokenList) Count() int {
return len(tl.items)
}
func (tl *TokenList) First() any {
if tl.IsEmpty() {
return nil
}
return tl.items[0]
}
func (tl *TokenList) Add(t any) {
tl.items = append(tl.items, t)
}
func (tl *TokenList) AppendList(other *TokenList) {
tl.items = append(tl.items, other.items...)
}
func (tl *TokenList) RemoveAll(subset *TokenList) {
var toRemove []any
for item := range subset.Iter {
toRemove = append(toRemove, item)
}
for _, item := range toRemove {
tl.RemoveOne(item)
}
}
func (tl *TokenList) RemoveOne(item any) {
index := tl.indexOf(item)
if index == -1 {
return
}
tl.removeAtIndex(index)
}
func (tl *TokenList) indexOf(item any) int {
for i, existing := range tl.items {
if reflect.DeepEqual(existing, item) {
return i
}
}
return -1
}
func (tl *TokenList) removeAtIndex(i int) {
tl.items = append(tl.items[:i], tl.items[i+1:]...)
}
func (tl *TokenList) Iter(yield func(any) bool) {
for _, item := range tl.items {
if !yield(item) {
return
}
}
}
type Binding struct {
assignments map[Label]*TokenList
}
func NewBinding() *Binding {
return &Binding{assignments: make(map[Label]*TokenList)}
}
func (b *Binding) Set(name Label, val *TokenList) {
if name == "" {
return
}
b.assignments[name] = val
}
func (b *Binding) Get(name Label) *TokenList {
return b.assignments[name]
}
func (b *Binding) String() string {
result := "{"
var parts []string
for label, tokens := range b.assignments {
part := fmt.Sprintf("%s: %s", label, b.formatTokens(tokens))
parts = append(parts, part)
}
result += joinWithComma(parts)
result += "}"
return result
}
func (b *Binding) formatTokens(tokens *TokenList) string {
result := "["
var parts []string
for token := range tokens.Iter {
parts = append(parts, fmt.Sprintf("%v", token))
}
result += joinWithComma(parts)
result += "]"
return result
}
func joinWithComma(parts []string) string {
result := ""
for i, part := range parts {
if i > 0 {
result += ", "
}
result += part
}
return result
}
type Place struct {
name Name
tokens *TokenList
}
func NewPlace(name Name) *Place {
return &Place{
name: name,
tokens: NewTokenList(),
}
}
func (p *Place) AddTokens(newTokens *TokenList) {
p.tokens.AppendList(newTokens)
}
func (p *Place) ConsumeTokens(toRemove *TokenList) {
p.tokens.RemoveAll(toRemove)
}
type Markings map[*Place]*TokenList
type GuardStrategy interface {
Check(b *Binding) bool
}
type InputStrategy interface {
Select(b *Binding, available *TokenList) *TokenList
}
type OutputStrategy interface {
Produce(b *Binding) *TokenList
}
type Arc struct {
place *Place
variable Label
}
type InputArc struct {
Arc
strategy InputStrategy
}
type OutputArc struct {
Arc
strategy OutputStrategy
}
type InputArcs struct {
arcs []*InputArc
}
func (inputs *InputArcs) Resolve(debugName Name) (*Binding, Markings, bool) {
binding := NewBinding()
markings := make(Markings)
for _, arc := range inputs.arcs {
selected := arc.strategy.Select(binding, arc.place.tokens)
if selected == nil || selected.IsEmpty() {
return nil, nil, false
}
markings[arc.place] = selected
binding.Set(arc.variable, selected)
}
return binding, markings, true
}
type OutputArcs struct {
arcs []*OutputArc
}
func (outputs *OutputArcs) GenerateAndDistribute(b *Binding) {
for _, arc := range outputs.arcs {
newTokens := arc.strategy.Produce(b)
arc.place.AddTokens(newTokens)
}
}
type TransitionLogic struct {
inputs InputArcs
outputs OutputArcs
guard GuardStrategy
}
type Transition struct {
name Name
logic TransitionLogic
}
func (t *Transition) AttemptFire() bool {
binding, tokensToConsume, isEnabled := t.logic.inputs.Resolve(t.name)
if !isEnabled {
return false
}
if !t.checkGuard(binding) {
return false
}
t.executeFire(binding, tokensToConsume)
return true
}
func (t *Transition) checkGuard(b *Binding) bool {
if t.logic.guard == nil {
return true
}
return t.logic.guard.Check(b)
}
func (t *Transition) executeFire(b *Binding, toConsume Markings) {
fmt.Printf(">>> FIRE: [%s] | Binding: %+v\n", t.name, b)
t.consume(toConsume)
t.produce(b)
}
func (t *Transition) consume(toConsume Markings) {
for place, tokens := range toConsume {
place.ConsumeTokens(tokens)
}
}
func (t *Transition) produce(b *Binding) {
t.logic.outputs.GenerateAndDistribute(b)
}
type PetriNet struct {
transitions []*Transition
places []*Place
}
func (pn *PetriNet) RunUntilStable() {
fmt.Println("--- STARTING SIMULATION ---")
step := 0
for {
step++
if !pn.RunStep() {
break
}
pn.checkSafetyLimit(step)
}
fmt.Println("--- NETWORK STABLE ---")
}
func (pn *PetriNet) RunStep() bool {
for _, t := range pn.transitions {
if t.AttemptFire() {
return true
}
}
return false
}
func (pn *PetriNet) checkSafetyLimit(step int) {
if step > 1000 {
panic("Safety limit reached")
}
}
type Order string
type Batch []Order
type SelectFirst struct{}
func (s SelectFirst) Select(b *Binding, pool *TokenList) *TokenList {
if pool.IsEmpty() {
return nil
}
return NewTokenList(pool.First())
}
type JoinList struct{}
func (s JoinList) Produce(b *Binding) *TokenList {
x := b.Get("x").First().(*Token[Order])
L := b.Get("L").First().(*Token[Batch])
newList := append(L.data, x.data)
return NewTokenList(NewToken(newList))
}
type ResetContext struct{}
func (s ResetContext) Produce(b *Binding) *TokenList {
return NewTokenList(NewToken(Batch{}))
}
type PassThrough struct{}
func (s PassThrough) Produce(b *Binding) *TokenList {
L := b.Get("L").First().(*Token[Batch])
return NewTokenList(L)
}
type GuardAccumulate struct{}
func (g GuardAccumulate) Check(b *Binding) bool {
L := b.Get("L").First().(*Token[Batch])
return len(L.data) < 50
}
type GuardRelease struct{}
func (g GuardRelease) Check(b *Binding) bool {
L := b.Get("L").First().(*Token[Batch])
return len(L.data) == 50
}
func initializePlaces(pQueue, pContext *Place) {
for i := 1; i <= 55; i++ {
pQueue.tokens.Add(NewToken(Order(fmt.Sprintf("Order-%02d", i))))
}
pContext.tokens.Add(NewToken(Batch{}))
}
func createAccumulateTransition(pQueue, pContext *Place) *Transition {
return &Transition{
name: "Accumulate",
logic: TransitionLogic{
inputs: InputArcs{arcs: []*InputArc{
{Arc: Arc{place: pQueue, variable: "x"}, strategy: SelectFirst{}},
{Arc: Arc{place: pContext, variable: "L"}, strategy: SelectFirst{}},
}},
outputs: OutputArcs{arcs: []*OutputArc{
{Arc: Arc{place: pContext}, strategy: JoinList{}},
}},
guard: GuardAccumulate{},
},
}
}
func createReleaseTransition(pContext, pShipped *Place) *Transition {
return &Transition{
name: "Release Batch",
logic: TransitionLogic{
inputs: InputArcs{arcs: []*InputArc{
{Arc: Arc{place: pContext, variable: "L"}, strategy: SelectFirst{}},
}},
outputs: OutputArcs{arcs: []*OutputArc{
{Arc: Arc{place: pShipped}, strategy: PassThrough{}},
{Arc: Arc{place: pContext}, strategy: ResetContext{}},
}},
guard: GuardRelease{},
},
}
}
func printReport(pQueue, pShipped *Place) {
fmt.Printf("\n--- FINAL REPORT ---\n")
fmt.Printf("Queue Leftovers: %d\n", pQueue.tokens.Count())
fmt.Printf("Shipped Batches: %d\n", pShipped.tokens.Count())
if !pShipped.tokens.IsEmpty() {
batch := pShipped.tokens.First().(*Token[Batch])
fmt.Printf("First Batch Size: %d\n", len(batch.data))
}
}
func main2() {
pQueue := NewPlace("OrderQueue")
pContext := NewPlace("EngineContext")
pShipped := NewPlace("Shipped")
initializePlaces(pQueue, pContext)
net := &PetriNet{
transitions: []*Transition{
createAccumulateTransition(pQueue, pContext),
createReleaseTransition(pContext, pShipped),
},
places: []*Place{pQueue, pContext, pShipped},
}
net.RunUntilStable()
printReport(pQueue, pShipped)
}
type DishType string
const (
Pizza DishType = "Pizza"
Pasta DishType = "Pasta"
Steak DishType = "Steak"
Fish DishType = "Fish"
Salad DishType = "Salad"
)
type Dish struct {
Type DishType
Name string
Cooked bool
}
type Plate struct {
TableID int
Dishes []Dish
}
type RestOrder struct {
TableID int
Dishes []Dish
}
type SplitOrder struct{}
func (s SplitOrder) Produce(b *Binding) *TokenList {
order := b.Get("order").First().(*Token[RestOrder])
result := NewTokenList()
for _, dish := range order.data.Dishes {
result.Add(dish)
fmt.Printf(" πŸ“ Split: %s\n", dish.Name)
}
return result
}
type SelectOrder struct{}
func (s SelectOrder) Select(b *Binding, pool *TokenList) *TokenList {
if pool.IsEmpty() {
return nil
}
return NewTokenList(pool.First())
}
type SelectByType struct {
dishType DishType
}
func (s SelectByType) Select(b *Binding, pool *TokenList) *TokenList {
for _, item := range pool.items {
if dish, ok := item.(Dish); ok && dish.Type == s.dishType {
return NewTokenList(dish)
}
}
return nil
}
type SelectAllDishes struct {
requiredCount int
}
func (s SelectAllDishes) Select(b *Binding, pool *TokenList) *TokenList {
if pool.Count() < s.requiredCount {
return nil
}
list := NewTokenList()
list.AppendList(pool)
return list
}
type CookDish struct{}
func (s CookDish) Produce(b *Binding) *TokenList {
dish := b.Get("dish").First().(Dish)
dish.Cooked = true
fmt.Printf(" 🍳 Cooking: %s\n", dish.Name)
return NewTokenList(dish)
}
type PassDish struct{}
func (s PassDish) Produce(b *Binding) *TokenList {
list := NewTokenList()
list.AppendList(b.Get("dish"))
return list
}
type ServePlate struct {
tableID int
}
func (s ServePlate) Produce(b *Binding) *TokenList {
tokens := b.Get("dishes")
var dishes []Dish
for token := range tokens.Iter {
dishes = append(dishes, token.(Dish))
}
fmt.Printf(" 🍽️ Waiter serving %d dishes to Table %d\n", len(dishes), s.tableID)
return NewTokenList(Plate{TableID: s.tableID, Dishes: dishes})
}
func createSplitTransition(name Name, from, to *Place) *Transition {
return &Transition{
name: name,
logic: TransitionLogic{
inputs: InputArcs{arcs: []*InputArc{
{Arc: Arc{place: from, variable: "order"}, strategy: SelectOrder{}},
}},
outputs: OutputArcs{arcs: []*OutputArc{
{Arc: Arc{place: to}, strategy: SplitOrder{}},
}},
},
}
}
func createRoutingTransition(name Name, from, to *Place, dishType DishType) *Transition {
return &Transition{
name: name,
logic: TransitionLogic{
inputs: InputArcs{arcs: []*InputArc{
{Arc: Arc{place: from, variable: "dish"}, strategy: SelectByType{dishType: dishType}},
}},
outputs: OutputArcs{arcs: []*OutputArc{
{Arc: Arc{place: to}, strategy: PassDish{}},
}},
},
}
}
func createCookTransition(name Name, station, ready *Place) *Transition {
return &Transition{
name: name,
logic: TransitionLogic{
inputs: InputArcs{arcs: []*InputArc{
{Arc: Arc{place: station, variable: "dish"}, strategy: SelectFirst{}},
}},
outputs: OutputArcs{arcs: []*OutputArc{
{Arc: Arc{place: ready}, strategy: CookDish{}},
}},
},
}
}
func createServeAllTransition(ready, served *Place, count int) *Transition {
return &Transition{
name: "Waiter Serves Table",
logic: TransitionLogic{
inputs: InputArcs{arcs: []*InputArc{
{Arc: Arc{place: ready, variable: "dishes"}, strategy: SelectAllDishes{requiredCount: count}},
}},
outputs: OutputArcs{arcs: []*OutputArc{
{Arc: Arc{place: served}, strategy: ServePlate{tableID: 1}},
}},
},
}
}
func main() {
fmt.Println("🍝 ITALIAN RESTAURANT SIMULATION πŸ•")
fmt.Println("===================================\n")
pOrderQueue := NewPlace("OrderQueue")
pDishQueue := NewPlace("DishQueue")
pPizzaStation := NewPlace("PizzaStation")
pPastaStation := NewPlace("PastaStation")
pSteakStation := NewPlace("SteakStation")
pFishStation := NewPlace("FishStation")
pSaladStation := NewPlace("SaladStation")
pReadyToServe := NewPlace("ReadyToServe")
pServedTable := NewPlace("ServedTable")
order := RestOrder{
TableID: 1,
Dishes: []Dish{
{Type: Pizza, Name: "Margherita Pizza"},
{Type: Pasta, Name: "Carbonara"},
{Type: Steak, Name: "Bistecca Fiorentina"},
{Type: Fish, Name: "Branzino al Forno"},
{Type: Salad, Name: "Insalata Caprese"},
},
}
pOrderQueue.tokens.Add(NewToken(order))
fmt.Printf("πŸ“‹ Order received: 1 order with %d dishes for Table %d\n\n", len(order.Dishes), order.TableID)
transitions := []*Transition{
createSplitTransition("Split Order", pOrderQueue, pDishQueue),
createRoutingTransition("Route Pizza", pDishQueue, pPizzaStation, Pizza),
createRoutingTransition("Route Pasta", pDishQueue, pPastaStation, Pasta),
createRoutingTransition("Route Steak", pDishQueue, pSteakStation, Steak),
createRoutingTransition("Route Fish", pDishQueue, pFishStation, Fish),
createRoutingTransition("Route Salad", pDishQueue, pSaladStation, Salad),
createCookTransition("Cook Pizza", pPizzaStation, pReadyToServe),
createCookTransition("Cook Pasta", pPastaStation, pReadyToServe),
createCookTransition("Cook Steak", pSteakStation, pReadyToServe),
createCookTransition("Cook Fish", pFishStation, pReadyToServe),
createCookTransition("Prep Salad", pSaladStation, pReadyToServe),
createServeAllTransition(pReadyToServe, pServedTable, len(order.Dishes)),
}
net := &PetriNet{
transitions: transitions,
places: []*Place{
pOrderQueue, pDishQueue, pPizzaStation, pPastaStation,
pSteakStation, pFishStation, pSaladStation,
pReadyToServe, pServedTable,
},
}
net.RunUntilStable()
fmt.Println("\n🍽️ SERVICE REPORT")
fmt.Println("==================")
fmt.Printf("Plates served: %d\n", pServedTable.tokens.Count())
if !pServedTable.tokens.IsEmpty() {
plate := pServedTable.tokens.First().(Plate)
fmt.Printf("Table %d received %d dishes:\n", plate.TableID, len(plate.Dishes))
for _, dish := range plate.Dishes {
fmt.Printf(" βœ… %s\n", dish.Name)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment