spacepaste

  1.  
  2. #!/usr/bin/env python
  3. """
  4. ZeroMQ Testing
  5. Python 2.6.7
  6. zeromq-2.1.10.tar.gz
  7. pyzmq-2.1.10.zip
  8. """
  9. import pickle
  10. import time
  11. import traceback as tb
  12. import sys
  13. from common import getLogger
  14. log = getLogger()
  15. try:
  16. import zmq
  17. except:
  18. sys.stderr.write("Unable to load zmq python extensions.")
  19. try:
  20. try:
  21. try:
  22. from multiprocessing import Process as Thread
  23. except:
  24. from processing import Process as Thread
  25. except:
  26. from threading import Thread
  27. except:
  28. sys.stderr.write("Unable to load multiprocessing")
  29. sys.exit(-1)
  30. def publish(url):
  31. zmq_context = zmq.Context()
  32. pub_socket = zmq_context.socket(zmq.PUB)
  33. pub_socket.bind(url)
  34. for i in range(0,1000):
  35. pub_socket.send(pickle.dumps(i))
  36. def recv(socket):
  37. ts = time.time()
  38. while time.time() - ts < 2:
  39. try:
  40. return pickle.loads(socket.recv(zmq.NOBLOCK))
  41. except zmq.ZMQError, e:
  42. if e.errno == zmq.EAGAIN:
  43. time.sleep(0.5)
  44. continue
  45. else:
  46. log.error("".join(tb.format_exception(*sys.exc_info())))
  47. def subscribe(url):
  48. zmq_context = zmq.Context()
  49. sub_socket = zmq_context.socket(zmq.SUB)
  50. sub_socket.setsockopt(zmq.SUBSCRIBE, "")
  51. sub_socket.connect(url)
  52. total = 0
  53. while True:
  54. msg = recv(sub_socket)
  55. if msg is None:
  56. log.debug("Exiting")
  57. break
  58. total += 1
  59. log.debug("%d msgs received" % (total))
  60. url = "tcp://*:7000"
  61. max_subs = 10
  62. log.debug("Creating subscribers")
  63. subs = list()
  64. for i in range(0,max_subs):
  65. sub = Thread(target=subscribe, args=(url,))
  66. sub.start()
  67. subs.append(sub)
  68. time.sleep(1)
  69. log.debug("Publishing to %s" % (url,))
  70. publish(url)
  71. log.debug("Joining subscribers")
  72. while len(subs) > 0:
  73. joined = None
  74. for s in subs:
  75. s.join(0.1)
  76. if not s.is_alive():
  77. joined = s
  78. if joined is not None:
  79. subs.remove(joined)
  80. """
  81. $ reset; ./test.py;
  82. 6800 Creating subscribers
  83. 6800 Publishing to tcp://*:7000
  84. 6800 Joining subscribers
  85. 6804 Exiting
  86. 6804 0 msgs received
  87. 6807 Exiting
  88. 6807 0 msgs received
  89. 6806 Exiting
  90. 6806 0 msgs received
  91. 6809 Exiting
  92. 6803 Exiting
  93. 6803 0 msgs received
  94. 6809 0 msgs received
  95. 6808 Exiting
  96. 6810 Exiting
  97. 6810 0 msgs received
  98. 6808 0 msgs received
  99. 6801 Exiting
  100. 6801 408 msgs received
  101. 6802 Exiting
  102. 6802 509 msgs received
  103. 6805 Exiting
  104. 6805 877 msgs received
  105. """
  106.