Skip to content Skip to sidebar Skip to footer

Python Multi-processing Question?

I have a folder with 500 input files (total size of all files is ~ 500[MB]). I'd like to write a python script that does the following: (1) load all of the input files to memory (

Solution 1:

ok I just whipped this up using zeromq to demonstrate a single subscriber to multiple publishers. You could probably do the same with queues but you would need to manage them a bit more. zeromq sockets just work which makes it nice for things like this IMO.

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""from multiprocessing import Process


classWorker(Process):
    def__init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    defrun(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        withopen(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass# in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               inenumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workersfor w in workers:
        print"starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long# as at least one worker is alivewhile [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the resultprint result

oh and to get zmq just

$ pip install pyzmq-static

Post a Comment for "Python Multi-processing Question?"