import sys import zmq import time import logging from multiprocessing import Process import hashlib import binascii logging.basicConfig(format="[%(asctime)s] " + \ "[%(process)d %(threadName)s %(name)s] " + \ "[%(module)s %(funcName)s %(lineno)d] " + \ "[%(levelname)s] %(message)s", \ level=logging.DEBUG) log = logging.getLogger(__name__) def hash(content): """ Helper function, was being used to assist in ensuring durability but it was slowing things down. """ return binascii.hexlify( hashlib.md5( pickle.dumps(content) ).digest() ) put_url = 'ipc://msging.put' get_url = 'ipc://msging.get' shutdown_url = 'ipc://msging.shutdown' """ Persistance layer, not in use because it slows things down. """ pth = 'msging.persist' def push(msg): try: with open(pth, 'r') as f: lst = f.read() lst = pickle.loads(lst) except: log.error(sys.exc_info()) lst = list() lst.append(msg) with open(pth, 'w') as f: lst = pickle.dumps(lst) f.write(lst) def pop(): with open(pth, 'r') as f: lst = f.read() lst = pickle.loads(lst) msg = lst.pop() with open(pth, 'w') as f: lst = pickle.dumps(lst) f.write(lst) return msg def message_queue(): log.debug("Creating context") context = zmq.Context() log.debug("Connecting to put socket") put = context.socket(zmq.PULL) put.bind(put_url) log.debug("Connecting to get socket") get = context.socket(zmq.REP) get.bind(get_url) log.debug("Connecting to shutdown socket") shutdown = context.socket(zmq.PULL) shutdown.bind(shutdown_url) log.debug("Connecting to poller") poller = zmq.Poller() poller.register(get, zmq.POLLIN) poller.register(put, zmq.POLLIN) poller.register(shutdown, zmq.POLLIN) lst = list() while True: comm = dict(poller.poll()) if comm.get(put, False): msg = put.recv() lst.append(msg) if comm.get(get, False): get.recv() get.send(lst.pop()) if comm.get(shutdown, False): log.warning("Shutting down") shutdown.recv() sys.exit() def put(msg, url = put_url): context = zmq.Context() put = context.socket(zmq.PUSH) put.connect(url) put.send(msg) def get(url = get_url): context = zmq.Context() get = context.socket(zmq.REQ) get.connect(url) get.send(str()) msg = get.recv() return msg def shutdown(msg = None, url = shutdown_url): context = zmq.Context() shutdown = context.socket(zmq.PUSH) shutdown.connect(url) shutdown.send(str()) mq = Process(target = message_queue) mq.start() time.sleep(1) max = 10000 def produce(): ts = time.time() count = 0 try: for i in range(max): msg = "hello msg %d" % i put(msg) count += 1 print "Sent %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts))) except: log.error(sys.exc_info()) def retrieve(): count = 0 ts = time.time() try: for j in range(max): get() count += 1 print "Recvd %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts))) except: log.error(sys.exc_info()) p = Process(target = produce) p.start() time.sleep(0.1) r = Process(target = retrieve) r.start() p.join() r.join() shutdown() """ Output: $ python test.py [2012-04-02 00:26:46,185] [31168 MainThread __main__] [test message_queue 71] [DEBUG] Creating context [2012-04-02 00:26:46,186] [31168 MainThread __main__] [test message_queue 74] [DEBUG] Connecting to put socket [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 78] [DEBUG] Connecting to get socket [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 82] [DEBUG] Connecting to shutdown socket [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 86] [DEBUG] Connecting to poller Sent 10000 msgs at rate of 1457.89044084 msg/sec. Recvd 10000 msgs at rate of 1244.68594864 msg/sec. [2012-04-02 00:26:55,330] [31168 MainThread __main__] [test message_queue 107] [WARNING] Shutting down """