- #!/usr/bin/env python
- """
- ZeroMQ Testing
- Python 2.6.7
- zeromq-2.1.10.tar.gz
- pyzmq-2.1.10.zip
- """
- import pickle
- import time
- import traceback as tb
- import sys
- from common import getLogger
- log = getLogger()
- try:
- import zmq
- except:
- sys.stderr.write("Unable to load zmq python extensions.")
- try:
- try:
- try:
- from multiprocessing import Process as Thread
- except:
- from processing import Process as Thread
- except:
- from threading import Thread
- except:
- sys.stderr.write("Unable to load multiprocessing")
- sys.exit(-1)
- def publish(url):
- zmq_context = zmq.Context()
- pub_socket = zmq_context.socket(zmq.PUB)
- pub_socket.bind(url)
- for i in range(0,1000):
- pub_socket.send(pickle.dumps(i))
- def recv(socket):
- ts = time.time()
- while time.time() - ts < 2:
- try:
- return pickle.loads(socket.recv(zmq.NOBLOCK))
- except zmq.ZMQError, e:
- if e.errno == zmq.EAGAIN:
- time.sleep(0.5)
- continue
- else:
- log.error("".join(tb.format_exception(*sys.exc_info())))
- def subscribe(url):
- zmq_context = zmq.Context()
- sub_socket = zmq_context.socket(zmq.SUB)
- sub_socket.setsockopt(zmq.SUBSCRIBE, "")
- sub_socket.connect(url)
- total = 0
- while True:
- msg = recv(sub_socket)
- if msg is None:
- log.debug("Exiting")
- break
- total += 1
- log.debug("%d msgs received" % (total))
- url = "tcp://*:7000"
- max_subs = 10
- log.debug("Creating subscribers")
- subs = list()
- for i in range(0,max_subs):
- sub = Thread(target=subscribe, args=(url,))
- sub.start()
- subs.append(sub)
- time.sleep(1)
- log.debug("Publishing to %s" % (url,))
- publish(url)
- log.debug("Joining subscribers")
- while len(subs) > 0:
- joined = None
- for s in subs:
- s.join(0.1)
- if not s.is_alive():
- joined = s
- if joined is not None:
- subs.remove(joined)
- """
- $ reset; ./test.py;
- 6800 Creating subscribers
- 6800 Publishing to tcp://*:7000
- 6800 Joining subscribers
- 6804 Exiting
- 6804 0 msgs received
- 6807 Exiting
- 6807 0 msgs received
- 6806 Exiting
- 6806 0 msgs received
- 6809 Exiting
- 6803 Exiting
- 6803 0 msgs received
- 6809 0 msgs received
- 6808 Exiting
- 6810 Exiting
- 6810 0 msgs received
- 6808 0 msgs received
- 6801 Exiting
- 6801 408 msgs received
- 6802 Exiting
- 6802 509 msgs received
- 6805 Exiting
- 6805 877 msgs received
- """