The observer pattern, or how I switched form event loops to event based algos.
Some of my first algos were written using a 'event loop' style control method. That is to say, when the algo was waiting for an event, it would keep looping over code until a condition was true (the event) and would then break or drop into another loop waiting for the next event.
For example:
Code:
order_id = execute_trade('EUR/USD', 'long', 100000, 'market')
while True:
if order_status(order_id) == 'Filled':
print 'Trade filled!'
#break into a sub-loop
while True:
#do other stuff, like test for a stop out condition, etc..
else:
time.sleep(0.01) #loop every 10 ms
This was fine for basic strategies, but the constant condition checking and CPU cycles burned on looping over and over while waiting for a condition to be met created a scaling problem I didn't realize I'd have until much later.
Put simply: while this worked fine for a hand full of strategies running at a time, things got hairy as soon as I scaled up beyond that. Issues like CPU usage came up, and as the strategies became more complex, or had some actual math being done with each loop, the amount I could run at the same became a limiting factor.
One option would be to toss more hardware at the problem. However, why spend money on hardware when the real issue was my novice programming skills?
Enter: The observer pattern!
The observer pattern is a way to cut out event loops entirely and only react, or check conditions, when something (like a price quote) changes. Strategies 'subscribe' to updates on specific variables, like orders being filled or time and sales feeds, and a daemon who's in charge of updating these variables lets all subscribed objects know there's been an update.
An added advantage to the observer pattern is the ability to reference the trade algo's object from many other points in your application or framework without worrying about how to interrupt some loop (not to mention this helps get around the blocking nature of loops when it comes to python's GIL and threading, but that's a technical talk for another time.)
To start off we're going to make 3 things: 1) An observable class to template the daemons we'll write. 2) At least 1 daemon to be in charge of a variable worth paying attention to. 3) A strategy / algo which will subscribe to events happening.
This won't be the absolute best way of implementing an observer pattern in Python, but it's the way I've used and currently have running in production code. As I've mentioned before, I'm not a programmer by trade, everything I've learned has been out of self-interest, so please feel free to comment or make suggestions if you think it can be improved.
1: The observable class
Code:
class Observable(dict):
"""
A class of thing that can be observed. When its notifyObservers()
method is called with an event, it passes that event on to its
observers.
"""
def addObserver(self, event, observer):
if self.has_key(event):
if observer not in self[event]:
self[event].append(observer)
else: self[event] = [observer]
def removeObserver(self, event, observer):
if observer in self[event]:
self[event].remove(observer)
else: pass
def notifyObservers(self, event):
if self.has_key(event):
for observer in self[event]:
observer.observeEvent(event, self.msg_name)
Notice we start by subclassing 'dict'. This makes the observable object inherit all that a dictionary object can do, and we'll be extending this with a few functions.
Inheriting dict means the daemon itself can self-store who's subscribed to what data. So code like -- self['foo'] = 'bar' -- would be the same as creating a dictionary and assigning the 'bar' value to a key named 'foo'.
Lastly, notice we have a method to add, remove, and notify other objects of updates. You end up literally adding a pointer to the other objects themselves to this internal dictionary of subscribers.
2) Now we inherit this new class to make our daemon:
Code:
class L1_Quote_Daemon(Observable):
"""
A daemon that listens on an assigned UDP port for L1 quote updates.
Quotes are stored in memory and observers get notified that there's a quote
update.
"""
def __init__(self, parent):
self.msg_name = "L1"
self.parent = parent
t = threading.Thread(target=self.run,)
t.setDaemon(True)
t.start()
def run(self):
import select, socket
bufferSize = 4096
port = 50152
a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
a.bind(('127.0.0.1', port))
a.setblocking(1)
while True:
result = select.select([a],[],[])
try:
msg = result[0][0].recv(bufferSize)
###
### Code that was here deals with the UDP packet/message, and extracts the important data like new bid and ask prices.
### This block would also store said data into memory that's readable by other parts of the application, as well as the trading algo.
###
self.notifyObservers(symbol)
except:
traceback.print_exc() #working with sockets can be a pain, so I make sure to catch all faults and write the info to console.
continue
A couple of things to note here:
My source of data in this case is a UDP socket connection to a custom port. I cut out the part that breaks the UDP packets down since specifics about the environment I'm working with isn't the focus. Your source can be nearly anything that runs a non-blocking wait for data (for example: using RTD on a windows platform to communicate with eSignal, using one of python's FIX libraries to listen for new FIX messages, using pywin32's DDE client to listen to DDE values change from ThinkOrSwim or Excel... etc... etc...)
I run the listening in a thread that it creates itself. That way I just create an object in my main program and it takes care of itself. It's own internal while loop is non-blocking, so it won't waste CPU cycles while waiting for new data (this relates to socket / network programming.)
Since any fault will kill the daemon, I wrap the work into a try/except statement and make sure to log any faults that arise. You don't want garbage data to kill your quote updates after all.. and as much as you might trust your own code, the data you're dealing with might not be as clean as you'd hope (.. on that note: IDC/eSignal can sit on a tack, and even Oanda's restful API isn't perfect in this regard either..timeouts suck.)
Finally, we need to call this daemon into existence. Somewhere in the main thread of your application, usually as part of a startup sequence, we should find a line like this:
Code:
self.L1_D = L1_Quote_Daemon(self)
"self." because this would typically be called within a class that manages all your trades or at least the highest level of your algo running (that is, you want to be able to reference it form other objects, so we gotta attach it to some context that's consistent.)
3) So now we have the daemon covered, we need something to listen to it:
Code:
class algo(object):
"""I'm a simple script that prints the latest B/A quotes to console"""
def __init__(self, parent_self, symbol):
self.name = self.strat_id
self.parent = parent_self
self.symbol = symbol
self.subscribe()
def subscribe(self):
self.parent.L1_D.addObserver(self.symbol, self)
def unsubscribe(self):
self.parent.L1_D.removeObserver(self.symbol, self)
def OnQuote(self): #L1 server pokes this method.
bid = float(self.parent.L1_feed[self.symbol][1])
ask = float(self.parent.L1_feed[self.symbol][2])
print bid, ask
def OnDepth(self): #L2 server pokes this method.
pass #code would be here to handle depth of market updates
def OnOrder(self): #ORD server pokes this method.
pass #code would be here to update order ticket updates
def OnTick(self): #TOS server pokes this method.
pass #code would be here to handle time and sales updates
def ObserveEvent(self, event, msg):
"""dispatch messages to event handlers"""
msg_dispatch = {'ORD':self.OnOrder,
'L1':self.OnQuote,
'L2':self.OnDepth,
'TOS':self.OnTick,}
msg_dispatch[msg]()
There's a lot here to cover, but we can see that when we bring this algo into existence it starts (via __init__) by knowing the variables of its parent, which include the daemon and the quotes stored in memory.
We also run a subscribe function that has the algo tell the L1 daemon that it wants to listen for L1 updates on the 'subject' of the symbol it was given when we created the algo object. (This is important, as we don't want updates when any symbol updates its quote, we just want to listen for a single symbol.) Notice we pass the symbol we want, plus the 'self' call, to the quote daemon.. we are literally pointing to the observing/algo object itself when it subscribes, and in python, if we __init__ a object with 'self' (a context,) passing 'self' means we're passing a reference it itself. This way the daemon knows when the symbol has a quote update that this specific object wants to know.
Finally, the ObserveEvent method is a catch-all for any type of update we might want to subscribe to. This way the Observable class only has to call one thing, instead of having to write out different observable classes and special calls for each type of daemon we want to make. The algo or object that subscribes for updates handles the data based on what type of message comes in... if it's a 'L1' message type, the appropriate method is called via a dispatch table.
We could (and should) extend this further by saying if there's no method set to handle a given message type, we should handle that. But as it stands this will just raise a key error.
BTW, dispatch tables are awesome.. I much prefer python's hash based dictionaries to create them, but the first time I mentioned dispatch tables was from my old algo journal where I provided an error lookup
dispatch table written in MQL4 for MT4 here.
My next post on this thread might be just about dispatch tables in python, that's how much I like them.
The code block for "onQuote" can do much more than just print out the latest bid and ask quotes.. this is where the algo's logic would reside. We can control what happens at each quote update at various stages of the algo's logic by setting some sort of state on the algo itself. For instance, if the self.state variable is set to 'entry_filled', maybe the OnQuote method would be testing for a stop out condition to be met. A switch statement or series of if / elif /else statements could accomplish this level of control.
The results:
Now, I didn't convert all my loop based algo work into observer pattern / event based objects for nothing. The immediate result was going from 50 concurrently running strategies across 50 different securities taking up 50%+ CPU (these were more than just basic strategies that would need to check for updates every 5ms or so,) to now being able to run the same exact strategies over 50 symbols with the CPU ranging from 0-1% utilization with the odd spike to 2-3% when heavy bursts of orders/actions would take place at once.. This was all on commodity hardware, a low end 2nd Gen Core i5 with 8Gb of RAM, I can imagine the additional headroom I'd have running this on a production Xeon system.
That about wraps it up for now.. I'll review this post tomorrow with fresh eyes to catch any typos and to re-write parts I think could be more clear, but if you have any questions or comments please feel free to post them.