Skip to main content

Threads and Process in Python

Introduction

To apply concurrency in python we can use threads or processes. To exemplify I will use parallel loops to demonstrate the technics.

Threads

We can create a new thread for each iteration of a loop.

# SuperFastPython.com
# example of a parallel for loop with the Thread class
from threading import Thread

# execute a task
def task(value):
    # add your work here...
    # ...
    # all done
    print(f'.done {value}')

# protect the entry point
if __name__ == '__main__':
    # create all tasks
    threads = [Thread(target=task, args=(i,)) for i in range(20)]
    # start all threads
    for thread in threads:
        thread.start()
    # wait for all threads to complete
    for thread in threads:
        thread.join()
    # report that all tasks are completed
    print('Done')

Keep in mind

  • This is good to small number of tasks.
  • Don't scale well, if you have a lot of tasks it will slow down cause they will compete for the CPU.
  • The results from tasks can't be returned easily.

ThreadPool

We can use a thread pool to reuse threads. This is programming pattern that handles the creation and management of threads for us.

# SuperFastPython.com
# example of a parallel for loop with the ThreadPool class
from multiprocessing.pool import ThreadPool

# execute a task
def task(value):
    # add your work here...
    # ...
    # return a result, if needed
    return value

# protect the entry point
if __name__ == '__main__':
    # create the pool with the default number of workers
    with ThreadPool() as pool:
        # issue one task for each call to the function
        for result in pool.map(task, range(100)):
            # handle the result
            print(f'>got {result}')
    # report that all tasks are completed
    print('Done')

Keep in mind

  • This is good to runs tasks that involve calling the same function many times with different arguments.
  • You can use functions instead of map to run different functions in parallel, using lazy, multiples args, async, etc.

ThreadPoolExecutor

We can create a pool of worker threads using the ThreadPoolExecutor class with a modern executor interface.

This allows tasks to be issued as one-off tasks via the submit() method, returning Future object that provides a handle on the task. It also allows the same function to be called many times with different arguments via the map() method.

# SuperFastPython.com
# example of a parallel for loop with the ThreadPoolExecutor class
import concurrent.futures

# execute a task
def task(value):
    # add your work here...
    # return a result, if needed
    return value

# protect the entry point
if __name__ == '__main__':
    # create the pool with the default number of workers
    with concurrent.futures.ThreadPoolExecutor() as exe:
        # issue some tasks and collect futures
        futures = [exe.submit(task, i) for i in range(50)]
        # handle results as tasks are completed
        for future in concurrent.futures.as_completed(futures):
            print(f'>got {future.result}')
        # issue one task for each call to the function
        for result in exe.map(task, range(50)):
            print(f'>got {result}')
    # report that all tasks are completed
    print('Done')

Keep in mind

  • This is the preferred approach to run parallel for-loops.
  • This is good to run one-off tasks as well as many calls to the same function with different arguments.

Processes

We can create a new child process for each iteration of the loop.

# SuperFastPython.com
# example of a parallel for loop with the Process class
from multiprocessing import Process

# execute a task
def task(value):
    # add your work here...
    # ...
    # all done
    print(f'.done {value}', flush=True)

# protect the entry point
if __name__ == '__main__':
    # create all tasks
    processes = [Process(target=task, args=(i,)) for i in range(20)]
    # start all processes
    for process in processes:
        process.start()
    # wait for all processes to complete
    for process in processes:
        process.join()
    # report that all tasks are completed
    print('Done')

Keep in mind

  • This is good to small number of tasks.
  • Don't scale well, if you have more tasks that CPU cores it will slow down cause they will compete for the CPU.
  • The results from tasks can't be returned easily.

Pool

We can create a pool of worker processes that can be reused for many tasks.

This can be achieved using the Pool class that will create one worker for each logical CPU core in the system.

The Pool class can be created using the context manager interface, which ensures that it is closed and all workers are released once we are finished with it.

# SuperFastPython.com
# example of a parallel for loop with the Pool class
from multiprocessing import Pool

# execute a task
def task(value):
    # add your work here...
    # ...
    # return a result, if needed
    return value

# protect the entry point
if __name__ == '__main__':
    # create the pool with the default number of workers
    with Pool() as pool:
        # issue one task for each call to the function
        for result in pool.map(task, range(100)):
            # handle the result
            print(f'>got {result}')
    # report that all tasks are completed
    print('Done')

Keep in mind

  • This is good to runs tasks that involve calling the same function many times with different arguments.
  • You can use functions instead of map to run different functions in parallel, using lazy, multiples args, async, etc.

ProcessPoolExecutor

we can create a pool of worker processes using the ProcessPoolExecutor class with a modern executor interface.

This allows tasks to be issued as one-off tasks via the submit() method, returning Future object that provides a handle on the task. It also allows the same function to be called many times with different arguments via the map() method.

# SuperFastPython.com
# example of a parallel for loop with the ProcessPoolExecutor class
import concurrent.futures

# execute a task
def task(value):
    # add your work here...
    # return a result, if needed
    return value

# protect the entry point
if __name__ == '__main__':
    # create the pool with the default number of workers
    with concurrent.futures.ProcessPoolExecutor() as exe:
        # issue some tasks and collect futures
        futures = [exe.submit(task, i) for i in range(50)]
        # process results as tasks are completed
        for future in concurrent.futures.as_completed(futures):
            print(f'>got {future.result}')
        # issue one task for each call to the function
        for result in exe.map(task, range(50)):
            print(f'>got {result}')
    # report that all tasks are completed
    print('Done')

Keep in mind

  • This is the preferred approach to run parallel for-loops.
  • This is good to run one-off tasks as well as many calls to the same function with different arguments.

What to use?

When you Threads or Processes depends of the problem you are trying to solve.

Thread-based concurrency is good for I/O-bound tasks, such as reading and writing files, network communication, database access, interact with devices, etc. This is note good for CPU-bound tasks because the GIL will prevent the threads from running in parallel, so threads are better, cause when one is waiting for I/O the GIL is release and another thread can run.

Process-based concurrency is good for CPU-bound tasks like calculating, parsing, encoding and modeling, such as image processing, video encoding, machine learning, etc. If you do this type of operations in threads, each task will be locked by GIL.

Prefer to use the ThreadPoolExecutor or ProcessPoolExecutor classes to run parallel for-loops.

  • Better interface to run the tasks.
  • Better performance by run only the number of tasks that can be run in parallel.
  • Better performance by reusing threads or processes.
  • When submitting tasks, they are queued and run as soon as a worker is available.

References

  1. Thread
  2. ThreadPool
  3. ThreadPoolExecutor
  4. Process
  5. Pool
  6. ProcessPoolExecutor