#!/usr/bin/env python """ ZeroMQ Testing Python 2.6.7 zeromq-2.1.10.tar.gz pyzmq-2.1.10.zip """ import pickle import time import traceback as tb import sys from common import getLogger log = getLogger() try: import zmq except: sys.stderr.write("Unable to load zmq python extensions.") try: try: try: from multiprocessing import Process as Thread except: from processing import Process as Thread except: from threading import Thread except: sys.stderr.write("Unable to load multiprocessing") sys.exit(-1) def publish(url): zmq_context = zmq.Context() pub_socket = zmq_context.socket(zmq.PUB) pub_socket.bind(url) for i in range(0,1000): pub_socket.send(pickle.dumps(i)) def recv(socket): ts = time.time() while time.time() - ts < 2: try: return pickle.loads(socket.recv(zmq.NOBLOCK)) except zmq.ZMQError, e: if e.errno == zmq.EAGAIN: time.sleep(0.5) continue else: log.error("".join(tb.format_exception(*sys.exc_info()))) def subscribe(url): zmq_context = zmq.Context() sub_socket = zmq_context.socket(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, "") sub_socket.connect(url) total = 0 while True: msg = recv(sub_socket) if msg is None: log.debug("Exiting") break total += 1 log.debug("%d msgs received" % (total)) url = "tcp://*:7000" max_subs = 10 log.debug("Creating subscribers") subs = list() for i in range(0,max_subs): sub = Thread(target=subscribe, args=(url,)) sub.start() subs.append(sub) time.sleep(1) log.debug("Publishing to %s" % (url,)) publish(url) log.debug("Joining subscribers") while len(subs) > 0: joined = None for s in subs: s.join(0.1) if not s.is_alive(): joined = s if joined is not None: subs.remove(joined) """ $ reset; ./test.py; 6800 Creating subscribers 6800 Publishing to tcp://*:7000 6800 Joining subscribers 6804 Exiting 6804 0 msgs received 6807 Exiting 6807 0 msgs received 6806 Exiting 6806 0 msgs received 6809 Exiting 6803 Exiting 6803 0 msgs received 6809 0 msgs received 6808 Exiting 6810 Exiting 6810 0 msgs received 6808 0 msgs received 6801 Exiting 6801 408 msgs received 6802 Exiting 6802 509 msgs received 6805 Exiting 6805 877 msgs received """