-
- import sys
- import zmq
- import time
- import logging
- from multiprocessing import Process
- import hashlib
- import binascii
-
- logging.basicConfig(format="[%(asctime)s] " + \
- "[%(process)d %(threadName)s %(name)s] " + \
- "[%(module)s %(funcName)s %(lineno)d] " + \
- "[%(levelname)s] %(message)s", \
- level=logging.DEBUG)
-
- log = logging.getLogger(__name__)
-
-
- def hash(content):
- """
- Helper function, was being used
- to assist in ensuring durability
- but it was slowing things down.
- """
- return binascii.hexlify(
- hashlib.md5(
- pickle.dumps(content)
- ).digest()
- )
-
- put_url = 'ipc://msging.put'
- get_url = 'ipc://msging.get'
- shutdown_url = 'ipc://msging.shutdown'
-
- """
- Persistance layer, not in use because it slows things down.
- """
- pth = 'msging.persist'
-
- def push(msg):
- try:
- with open(pth, 'r') as f:
- lst = f.read()
- lst = pickle.loads(lst)
- except:
- log.error(sys.exc_info())
- lst = list()
- lst.append(msg)
- with open(pth, 'w') as f:
- lst = pickle.dumps(lst)
- f.write(lst)
-
- def pop():
- with open(pth, 'r') as f:
- lst = f.read()
- lst = pickle.loads(lst)
- msg = lst.pop()
- with open(pth, 'w') as f:
- lst = pickle.dumps(lst)
- f.write(lst)
- return msg
-
-
-
-
- def message_queue():
-
- log.debug("Creating context")
- context = zmq.Context()
-
- log.debug("Connecting to put socket")
- put = context.socket(zmq.PULL)
- put.bind(put_url)
-
- log.debug("Connecting to get socket")
- get = context.socket(zmq.REP)
- get.bind(get_url)
-
- log.debug("Connecting to shutdown socket")
- shutdown = context.socket(zmq.PULL)
- shutdown.bind(shutdown_url)
-
- log.debug("Connecting to poller")
- poller = zmq.Poller()
- poller.register(get, zmq.POLLIN)
- poller.register(put, zmq.POLLIN)
- poller.register(shutdown, zmq.POLLIN)
-
- lst = list()
-
- while True:
-
- comm = dict(poller.poll())
-
- if comm.get(put, False):
- msg = put.recv()
- lst.append(msg)
-
-
- if comm.get(get, False):
- get.recv()
- get.send(lst.pop())
-
- if comm.get(shutdown, False):
- log.warning("Shutting down")
- shutdown.recv()
- sys.exit()
-
- def put(msg, url = put_url):
- context = zmq.Context()
- put = context.socket(zmq.PUSH)
- put.connect(url)
- put.send(msg)
-
- def get(url = get_url):
- context = zmq.Context()
- get = context.socket(zmq.REQ)
- get.connect(url)
- get.send(str())
- msg = get.recv()
- return msg
-
- def shutdown(msg = None, url = shutdown_url):
- context = zmq.Context()
- shutdown = context.socket(zmq.PUSH)
- shutdown.connect(url)
- shutdown.send(str())
-
- mq = Process(target = message_queue)
- mq.start()
- time.sleep(1)
-
- max = 10000
- def produce():
- ts = time.time()
- count = 0
- try:
- for i in range(max):
- msg = "hello msg %d" % i
- put(msg)
- count += 1
- print "Sent %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts)))
- except:
- log.error(sys.exc_info())
-
- def retrieve():
- count = 0
- ts = time.time()
- try:
- for j in range(max):
- get()
- count += 1
- print "Recvd %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts)))
- except:
- log.error(sys.exc_info())
-
- p = Process(target = produce)
- p.start()
- time.sleep(0.1)
-
- r = Process(target = retrieve)
- r.start()
-
- p.join()
- r.join()
-
- shutdown()
-
- """
- Output:
-
- $ python test.py
- [2012-04-02 00:26:46,185] [31168 MainThread __main__] [test message_queue 71] [DEBUG] Creating context
- [2012-04-02 00:26:46,186] [31168 MainThread __main__] [test message_queue 74] [DEBUG] Connecting to put socket
- [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 78] [DEBUG] Connecting to get socket
- [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 82] [DEBUG] Connecting to shutdown socket
- [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 86] [DEBUG] Connecting to poller
- Sent 10000 msgs at rate of 1457.89044084 msg/sec.
- Recvd 10000 msgs at rate of 1244.68594864 msg/sec.
- [2012-04-02 00:26:55,330] [31168 MainThread __main__] [test message_queue 107] [WARNING] Shutting down
- """
-