spacepaste

  1.  
  2. import sys
  3. import zmq
  4. import time
  5. import logging
  6. from multiprocessing import Process
  7. import hashlib
  8. import binascii
  9. logging.basicConfig(format="[%(asctime)s] " + \
  10. "[%(process)d %(threadName)s %(name)s] " + \
  11. "[%(module)s %(funcName)s %(lineno)d] " + \
  12. "[%(levelname)s] %(message)s", \
  13. level=logging.DEBUG)
  14. log = logging.getLogger(__name__)
  15. def hash(content):
  16. """
  17. Helper function, was being used
  18. to assist in ensuring durability
  19. but it was slowing things down.
  20. """
  21. return binascii.hexlify(
  22. hashlib.md5(
  23. pickle.dumps(content)
  24. ).digest()
  25. )
  26. put_url = 'ipc://msging.put'
  27. get_url = 'ipc://msging.get'
  28. shutdown_url = 'ipc://msging.shutdown'
  29. """
  30. Persistance layer, not in use because it slows things down.
  31. """
  32. pth = 'msging.persist'
  33. def push(msg):
  34. try:
  35. with open(pth, 'r') as f:
  36. lst = f.read()
  37. lst = pickle.loads(lst)
  38. except:
  39. log.error(sys.exc_info())
  40. lst = list()
  41. lst.append(msg)
  42. with open(pth, 'w') as f:
  43. lst = pickle.dumps(lst)
  44. f.write(lst)
  45. def pop():
  46. with open(pth, 'r') as f:
  47. lst = f.read()
  48. lst = pickle.loads(lst)
  49. msg = lst.pop()
  50. with open(pth, 'w') as f:
  51. lst = pickle.dumps(lst)
  52. f.write(lst)
  53. return msg
  54. def message_queue():
  55. log.debug("Creating context")
  56. context = zmq.Context()
  57. log.debug("Connecting to put socket")
  58. put = context.socket(zmq.PULL)
  59. put.bind(put_url)
  60. log.debug("Connecting to get socket")
  61. get = context.socket(zmq.REP)
  62. get.bind(get_url)
  63. log.debug("Connecting to shutdown socket")
  64. shutdown = context.socket(zmq.PULL)
  65. shutdown.bind(shutdown_url)
  66. log.debug("Connecting to poller")
  67. poller = zmq.Poller()
  68. poller.register(get, zmq.POLLIN)
  69. poller.register(put, zmq.POLLIN)
  70. poller.register(shutdown, zmq.POLLIN)
  71. lst = list()
  72. while True:
  73. comm = dict(poller.poll())
  74. if comm.get(put, False):
  75. msg = put.recv()
  76. lst.append(msg)
  77. if comm.get(get, False):
  78. get.recv()
  79. get.send(lst.pop())
  80. if comm.get(shutdown, False):
  81. log.warning("Shutting down")
  82. shutdown.recv()
  83. sys.exit()
  84. def put(msg, url = put_url):
  85. context = zmq.Context()
  86. put = context.socket(zmq.PUSH)
  87. put.connect(url)
  88. put.send(msg)
  89. def get(url = get_url):
  90. context = zmq.Context()
  91. get = context.socket(zmq.REQ)
  92. get.connect(url)
  93. get.send(str())
  94. msg = get.recv()
  95. return msg
  96. def shutdown(msg = None, url = shutdown_url):
  97. context = zmq.Context()
  98. shutdown = context.socket(zmq.PUSH)
  99. shutdown.connect(url)
  100. shutdown.send(str())
  101. mq = Process(target = message_queue)
  102. mq.start()
  103. time.sleep(1)
  104. max = 10000
  105. def produce():
  106. ts = time.time()
  107. count = 0
  108. try:
  109. for i in range(max):
  110. msg = "hello msg %d" % i
  111. put(msg)
  112. count += 1
  113. print "Sent %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts)))
  114. except:
  115. log.error(sys.exc_info())
  116. def retrieve():
  117. count = 0
  118. ts = time.time()
  119. try:
  120. for j in range(max):
  121. get()
  122. count += 1
  123. print "Recvd %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts)))
  124. except:
  125. log.error(sys.exc_info())
  126. p = Process(target = produce)
  127. p.start()
  128. time.sleep(0.1)
  129. r = Process(target = retrieve)
  130. r.start()
  131. p.join()
  132. r.join()
  133. shutdown()
  134. """
  135. Output:
  136. $ python test.py
  137. [2012-04-02 00:26:46,185] [31168 MainThread __main__] [test message_queue 71] [DEBUG] Creating context
  138. [2012-04-02 00:26:46,186] [31168 MainThread __main__] [test message_queue 74] [DEBUG] Connecting to put socket
  139. [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 78] [DEBUG] Connecting to get socket
  140. [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 82] [DEBUG] Connecting to shutdown socket
  141. [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 86] [DEBUG] Connecting to poller
  142. Sent 10000 msgs at rate of 1457.89044084 msg/sec.
  143. Recvd 10000 msgs at rate of 1244.68594864 msg/sec.
  144. [2012-04-02 00:26:55,330] [31168 MainThread __main__] [test message_queue 107] [WARNING] Shutting down
  145. """
  146.