2015-11-02 Multiprocessing

Testing some multi-threading libraries

In [ ]:
import time
import zmq
from  multiprocessing import Process

# The "ventilator" function generates a list of numbers from 0 to 10000, and 
# sends those numbers down a zeromq "PUSH" connection to be processed by 
# listening workers, in a round robin load balanced fashion.

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Send the numbers between 1 and 1 million as work messages
    for num in range(10000):
        work_message = { 'num' : num }
        ventilator_send.send_json(work_message)

    time.sleep(1)

# The "worker" functions listen on a zeromq PULL connection for "work" 
# (numbers to be processed) from the ventilator, square those numbers,
# and send the results down another zeromq PUSH connection to the 
# results manager.

def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
        socks = dict(poller.poll())

        # If the message came from work_receiver channel, square the number
        # and send the answer to the results reporter
        if socks.get(work_receiver) == zmq.POLLIN:
            work_message = work_receiver.recv_json()
            product = work_message['num'] * work_message['num']
            answer_message = { 'worker' : wrk_num, 'result' : product }
            results_sender.send_json(answer_message)

        # If the message came over the control channel, shut down the worker.
        if socks.get(control_receiver) == zmq.POLLIN:
            control_message = control_receiver.recv()
            if control_message == "FINISHED":
                print("Worker %i received FINSHED, quitting!" % wrk_num)
                break

# The "results_manager" function receives each result from multiple workers,
# and prints those results.  When all results have been received, it signals
# the worker processes to shut down.

def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()
    
    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(10000):
        result_message = results_receiver.recv_json()
        print( "Worker %i answered: %i" % (result_message['worker'], result_message['result']))

    # Signal to all workers that we are finsihed
    control_sender.send("FINISHED")
    time.sleep(5)

if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    worker_pool = range(10)
    for wrk_num in range(len(worker_pool)):
        Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager...
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()
In [ ]:
# multiproc_test.py

import random
import multiprocessing


def list_append(count, id, out_list):
	"""
	Creates an empty list and then appends a 
	random number to the list 'count' number
	of times. A CPU-heavy operation!
	"""
	for i in range(count):
		out_list.append(random.random())

if __name__ == "__main__":
	size = 10000000   # Number of random numbers to add
	procs = 2   # Number of processes to create

	# Create a list of jobs and then iterate through
	# the number of processes appending each process to
	# the job list 
	jobs = []
	for i in range(0, procs):
		out_list = list()
		process = multiprocessing.Process(target=list_append, 
			                              args=(size, i, out_list))
		jobs.append(process)

	# Start the processes (i.e. calculate the random number lists)		
	for j in jobs:
		j.start()

	# Ensure all of the processes have finished
	for j in jobs:
		j.join()

	print ("List processing complete.")
    
In [ ]:
from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print (sum(global_array))


def main():
    processes = [Process(target=child) for _ in range(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

if __name__ == "__main__":
    main()
In [ ]:
!ipcluster start -n 4
In [ ]:
from IPython import parallel
clients = parallel.Client()
clients.block = False
print( clients.ids)
In [ ]:
"""REQ/REP modified with QUEUE/ROUTER/DEALER add-on ---------------------------

   Multithreaded Hello World server

   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>

"""
import time
import threading
import zmq

print ("ZeroMQ version sanity-check: ", zmq.__version__)

def aWorker_asRoutine( aWorker_URL, aContext = None ):
    """Worker routine"""
    #Context to get inherited or create a new one trick------------------------------
    aContext = aContext or zmq.Context.instance()

    # Socket to talk to dispatcher --------------------------------------------------
    socket = aContext.socket( zmq.REP )

    socket.connect( aWorker_URL )

    while True:

        string  = socket.recv()

        print( "Received request: [ %s ]" % ( string ) )

        # do some 'work' -----------------------------------------------------------
        time.sleep(1)

        #send reply back to client, who asked --------------------------------------
        socket.send( b"World" )


def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets ------------------------------------------------
    aLocalhostCentralContext = zmq.Context.instance()

    # Socket to talk to clients ------------------------------------------------------
    clients = aLocalhostCentralContext.socket( zmq.ROUTER )
    clients.bind( url_client )

    # Socket to talk to workers ------------------------------------------------------
    workers = aLocalhostCentralContext.socket( zmq.DEALER )
    workers.bind( url_worker )

    # --------------------------------------------------------------------||||||||||||--
    # Launch pool of worker threads --------------< or spin-off by one in OnDemandMODE >
    for i in range(5):
        thread = threading.Thread( target = aWorker_asRoutine, args = ( url_worker, ) )
        thread.start()

    zmq.device( zmq.QUEUE, clients, workers )

    # ----------------------|||||||||||||||------------------------< a fair practice >--
    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    aLocalhostCentralContext.term()

if __name__ == "__main__":
    main()
In [3]:
from concurrent.futures import ThreadPoolExecutor
import time
def wait_on_b():
    time.sleep(5)
    print(b.result()) # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result()) # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
In [7]:
a.done()
Out[7]:
False
In [ ]:
 

Comments

Comments powered by Disqus