spacepaste

  1.  
  2. import sys
  3. import zmq
  4. import time
  5. import logging
  6. from multiprocessing import Process
  7. import hashlib
  8. import binascii
  9. import struct
  10. logging.basicConfig(format="[%(asctime)s] " + \
  11. "[%(process)d %(threadName)s %(name)s] " + \
  12. "[%(module)s %(funcName)s %(lineno)d] " + \
  13. "[%(levelname)s] %(message)s", \
  14. level=logging.DEBUG)
  15. log = logging.getLogger(__name__)
  16. def hash(content):
  17. """
  18. Helper function, was being used
  19. to assist in ensuring durability
  20. but it was slowing things down.
  21. """
  22. return binascii.hexlify(
  23. hashlib.md5(
  24. pickle.dumps(content)
  25. ).digest()
  26. )
  27. put_url = 'ipc://msging.put'
  28. get_url = 'ipc://msging.get'
  29. shutdown_url = 'ipc://msging.shutdown'
  30. """
  31. Persistance layer, not in use because it slows things down.
  32. """
  33. pth = 'msging.persist'
  34. writer = open(pth, 'a')
  35. reader = open(pth, 'r+')
  36. def push(msg):
  37. data = pickle.dumps(msg)
  38. data = struct.pack('<L', len(data)) + data # size header
  39. data += '\x00' # marks if message has been read
  40. writer.write(data)
  41. def pop():
  42. while True:
  43. len, = struct.unpack('<L', reader.read(4))
  44. data = reader.read(len)
  45. if reader.read(1) != '\x00': # skipping read messages
  46. continue
  47. msg = pickle.loads(data)
  48. reader.seek(-1, 1)
  49. reader.write('\x01') # mark we have read message
  50. return msg
  51. def message_queue():
  52. log.debug("Creating context")
  53. context = zmq.Context()
  54. log.debug("Connecting to put socket")
  55. put = context.socket(zmq.PULL)
  56. put.bind(put_url)
  57. log.debug("Connecting to get socket")
  58. get = context.socket(zmq.REP)
  59. get.bind(get_url)
  60. log.debug("Connecting to shutdown socket")
  61. shutdown = context.socket(zmq.PULL)
  62. shutdown.bind(shutdown_url)
  63. log.debug("Connecting to poller")
  64. poller = zmq.Poller()
  65. poller.register(get, zmq.POLLIN)
  66. poller.register(put, zmq.POLLIN)
  67. poller.register(shutdown, zmq.POLLIN)
  68. lst = list()
  69. while True:
  70. comm = dict(poller.poll())
  71. if comm.get(put, False):
  72. msg = put.recv()
  73. lst.append(msg)
  74. if comm.get(get, False):
  75. get.recv()
  76. get.send(lst.pop())
  77. if comm.get(shutdown, False):
  78. log.warning("Shutting down")
  79. shutdown.recv()
  80. sys.exit()
  81. def put(msg, url = put_url):
  82. context = zmq.Context()
  83. put = context.socket(zmq.PUSH)
  84. put.connect(url)
  85. put.send(msg)
  86. def get(url = get_url):
  87. context = zmq.Context()
  88. get = context.socket(zmq.REQ)
  89. get.connect(url)
  90. get.send(str())
  91. msg = get.recv()
  92. return msg
  93. def shutdown(msg = None, url = shutdown_url):
  94. context = zmq.Context()
  95. shutdown = context.socket(zmq.PUSH)
  96. shutdown.connect(url)
  97. shutdown.send(str())
  98. mq = Process(target = message_queue)
  99. mq.start()
  100. time.sleep(1)
  101. max = 10000
  102. def produce():
  103. ts = time.time()
  104. count = 0
  105. try:
  106. for i in range(max):
  107. msg = "hello msg %d" % i
  108. put(msg)
  109. count += 1
  110. print "Sent %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts)))
  111. except:
  112. log.error(sys.exc_info())
  113. def retrieve():
  114. count = 0
  115. ts = time.time()
  116. try:
  117. for j in range(max):
  118. get()
  119. count += 1
  120. print "Recvd %d msgs at rate of %s msg/sec." % (count, str(count / (time.time() - ts)))
  121. except:
  122. log.error(sys.exc_info())
  123. p = Process(target = produce)
  124. p.start()
  125. time.sleep(0.1)
  126. r = Process(target = retrieve)
  127. r.start()
  128. p.join()
  129. r.join()
  130. shutdown()
  131. """
  132. Output:
  133. $ python test.py
  134. [2012-04-02 00:26:46,185] [31168 MainThread __main__] [test message_queue 71] [DEBUG] Creating context
  135. [2012-04-02 00:26:46,186] [31168 MainThread __main__] [test message_queue 74] [DEBUG] Connecting to put socket
  136. [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 78] [DEBUG] Connecting to get socket
  137. [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 82] [DEBUG] Connecting to shutdown socket
  138. [2012-04-02 00:26:46,187] [31168 MainThread __main__] [test message_queue 86] [DEBUG] Connecting to poller
  139. Sent 10000 msgs at rate of 1457.89044084 msg/sec.
  140. Recvd 10000 msgs at rate of 1244.68594864 msg/sec.
  141. [2012-04-02 00:26:55,330] [31168 MainThread __main__] [test message_queue 107] [WARNING] Shutting down
  142. """
  143.