How to create a thread safe queue?

I started to play around with a MIDI device and soon noticed that if you sleep in a listening thread you will miss some events during sleeping. Not so suprising really … no sleeping in the classroom kids :wink:

Anyway … since that I created separate listener thread and used get and set to create thread safe states. Which then brings us to the problem because states in a game are not really immutable. I need somehow to listen events and process them in separate thread.

So far I’v tried using shared variables ($events) … not a good idea. And then creating a copy of the object and passing that again using get and set:

set :events, [] # Init events array somewhere

# Thread for listening events from the novation launchpad
live_loop :listen_midi do
  use_real_time
  # midi note is touch position 11, 12, 13 ...
  # midi velocity is touch 127=on 0=off
  pad, touch = sync launchpad_in
  # note_on = pads, control_change = options
  type = sync_type launchpad_in
  
  events = (get :events).dup # Dup to create a copy
  
  xy = pad.to_s.chars
  x = xy[0].to_i
  y = xy[1].to_i
  
  if type=="note_on"
    if touch == 0 # Touch off
       # Push to list to be dealt with later
      set :events, events.push({event: :push, x: x, y: y})
    end
  end
end

# Thread for making things happen
live_loop :check_events do
  use_real_time
  
  events = (get :events).dup # Dupping again
  event = events.shift # pop first
  set :events, events # Set again -1 event
  
  if event[:type] == :push
    # Do something with the event
  end
  
  sleep 1 # Safe to sleep
end

Not sure if this is a right approach or should I turn to Ruby again and start using Queue?

Out of interest, why do you need to sleep in the listening thread?

If you simply use sync it will always return the next event. If the event has already arrived, it will return that, if an event is yet to arrive, it will wait for the next event and then return that:

live_loop :foo do
  a = sync :quux
  puts a
end

If you throw a sleep in there then that will have the effect of throttling the incoming events to at max 1 event per sleep time:

live_loop :bar do
  a = sync :quux
  puts a
  sleep 1
end

So, in the :bar live loop above, you’ll never process more than one :quux a second. In the :foo live loop you’ll process every single :quux event.

There shouldn’t be any need for queues if you stick to using the Time State system. However, it’s possible you might have found a case where this isn’t true and if so, I’d love to understand more about what you’re specifically trying to achieve.

Hi @amiika,
instead of copying event queues you could use a lock mechanism like that one

events = []
set :lock, false

live_loop :create_events do
  if (get :lock) == false
    set :lock, true
    events.push({"type" => "puts", "value" => tick})
    puts "#{events.length} events in queue" if events.length > 1
    set :lock, false
    cue :new_event
  end
  sleep rrand(0.3, 1.0)  # simulate external midi events
end


live_loop :process_events do
  sync :new_event
  while events.length > 0
    if (get :lock) == false
      set :lock, true
      event = events.shift
      set :lock, false
      if event
        if event["type"] == "puts"
          puts event["value"]
          sleep rrand(0.2, 1.0)  # simulate variable processing time
        end
      end
    end
  end
end

The lock ensures that only one loop is writing into the events array. Of course, this is imitating the queue data structure to some extend. Furthermore, an atomic block would be needed for reading and writing the lock.
EDIT: used some random timers to simulate external midi events

1 Like

Note that using a ‘global’ variable like this isn’t thread safe and will potentially result in non-deterministic behaviour due to race conditions. It’s also not supported and may or may not be possible in future versions as the language evolves.

It’s also important to point out that using variables like this (defined at the top and referenced within live loops) is fine and will be supported provided that you don’t modify them in any way, so something like foo = 1 would be fine. However, here you’re initialising an empty array and the subsequently modifying it within other threads.

1 Like

I was just doing that in the earlier version of the Minesweeper game. I needed to wait for few seconds before sending a signal to shut of a flashing cell, but doing that caused that I missed some midi events while sleeping. After that I started using separate threads for receiving and sending the midi events and thus needed a queue because I was still sleeping in the another thread while waiting some lights to flash.

But you are right. I don’t really need queues. I can just use separate threads for handling each blinking light to ensure that I don’t have to pile up old events. I still might need queue for something else, but not for this :slight_smile:

Nice use of fake events. Thanks! That helped me to figure out how to use cue and sync together. I think I can use cue to send processed events from the mini listening thread to another threads and avoid using that event queue.

1 Like

Here is an example of sending processed events with cues instead of creating event queue if someone else is wondering how to do such a thing:

live_loop :midi_listener do
  # Process some midi events here ... and send cues when processed
  cue :processed_event, x: tick, y: rrand_i(1,8) # Send some fake events
  sleep 0.1 # No need to sleep in a real midi listener
end

live_loop :process_events do
  event = sync :processed_event
  in_thread do
    print "Event "+event[:x].to_s+" "+event[:y].to_s
    sleep 1
    print "Finished with event "+event[:x].to_s+" "+event[:y].to_s
  end
end

Edit: Added thread for sleeping

In a real listener you could use use_real_time and no need to sleep in that thread.

Hi,
this way you may loose events. If you change the line

cue :processed_event, x: rrand_i(1,8), y: rrand_i(1,8) # Send some fake events

to

cue :processed_event, x: tick, y: rrand_i(1,8) # Send some fake events

and observe the output you will notice that only approx 1 out of 10 events is being processed. Cues are not queued. In the final system it will depend on the processing times and the event frequency whether this will create a problem.

Actually, all cues are stored. However, the difference with traditional queues is that get and sync use the logical time to access the ‘next’ event. If no sleep happens between subequent calls to get or sync, then for that thread it will behave just as if the cues all went into an old-school queue.

The difference between get and sync is that sync will act like a special sleep where the sleep time is calculated to be precisely the time difference between the current time and the time of the next event (which may have already arrived). get does not advance time so it does need to be used in conjunction with sleep or other calls to sync for it to work correctly (otherwise time stalls and the thread dies due to running too far behind).

Essentially, the Sonic Pi way is to have lots of threads/live_loops each processing the events that they are interested in.

Thanks for the explanation. So how would you then access the queued events in the process_events- loop? Using get within a while loop and checking whether new events still arrive?

Just use sync in its own live loop:

live_loop :foo do
  data = sync "/path/to/sync/on"
  do_stuff_with(data)
end

This will not miss any incoming events which match the sync :slight_smile:

1 Like

Apparently I am doing something wrong here:

live_loop :midi_listener do
  # Process some midi events here ... and send cues when processed
  cue :processed_event, x: tick, y: rrand_i(1,8) # Send some fake events
  sleep 0.1 # No need to sleep in a real midi listener
end

live_loop :process_events do
  event = sync "/cue/processed_event"
  print event
  sleep 1
end

The process_events-loop only catches every 10th event … like sync :processed_event does (?)

And one other point - if in do_stuff_with(data) you need to include a sleep, then you can put that in its own thread:

live_loop :process_events do
  event = sync :processed_event
  in_thread do
    print "Event "+event[:x].to_s+" "+event[:y].to_s
    sleep 1 
    print "Finished with event "+event[:x].to_s+" "+event[:y].to_s
  end
end
2 Likes

Yep, if you want the :process_events to process every event, then it shouldn’t contain a sleep. No sleeping on the job!

Oh yes … got it … :scream:
Thanks!

Yes. Fixed my example. Thanks!