Concurrency Models in Python
- malshehri88
- 10 hours ago
- 11 min read
27 April 2025 / 13 min read

introduction
ℹ️This is part of a series of learning from Fluent Python by Luciano Ramalho.
Recently, my core focus has been on performance testing our proxy server for accessing LLMs in preparation for production within our internal infrastructure. The term concurrent users frequently comes up when exploring performance testing tools such as Locust. While I understand the general concept of concurrency, I had not deep-dive into concurrency in Python. Hence, I decided to start this article to better understand the core concepts and how they apply to Python.
To begin, let’s understand the concept of concurrency vs. parallelism as explained by Rob Pike (in the context of Go, but still relevant for Python to an extent):
For some, understanding concurrency vs. parallelism can be challenging, as they’re often used interchangeably. The following provides a clear distinction between the two:
💡Concurrency: Ability to handle multiple pending tasks, making progress one at a time or in parallel so that each of them eventually succeeds or fails.
💡Parallelism: Ability to execute multiple tasks at the same time. This requires multicore CPU, multiple CPUs, or GPU.
With that understanding, let’s dive into why concurrency in Python is often viewed as problematic, especially regarding its multithreading performance.
Issue with The Infamous GIL
Python’s Global Interpreter Lock (GIL) is a mutex, prevents multiple threads from executing Python bytecodes at once to ensure thread safety.
💡Thread: An execution unit within a single process, sharing the same memory space with other threads, which can risk corruption if multiple threads update the same object concurrently. Threads consume fewer resources than processes for the same tasks.
The main issue with the GIL is that it creates a performance bottleneck, limiting the parallelism of CPU-bound Python programs because only one thread can execute Python code at a time.
For example, let’s compute the Fibonacci numbers using 4 threads versus sequentially for 4 iterations:
# -*- coding: utf-8 -*-
import threading
import time
from functools import wraps
def timeit(func):
@wraps(func)
def timeit_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
total_time = end_time - start_time
print(f"Time taken: {total_time:.4f} seconds")
return result
return timeit_wrapper
def fibonacci(n: int):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
def compute_fibonacci():
for i in range(35, 40):
print(f"Fibonacci({i}) = {fibonacci(i)}")
@timeit
def multithreaded_fibonacci(num_threads: int ):
threads = []
for in range(numthreads):
t = threading.Thread(target=compute_fibonacci)
threads.append(t)
t.start()
for t in threads:
t.join()
@timeit
def sequential_fibonacci(num_iterations: int):
for in range(numiterations):
compute_fibonacci()
The assumption is that with modern multi-core CPUs, running on 4 threads should provide an improvement over running sequentially for 4 iterations.
However, the results shows otherwise:
if name == "__main__":
multithreaded_fibonacci(4)
sequential_fibonacci(4)
>>> Time taken: 107.7314 seconds
>>> Time taken: 106.4314 seconds
As GIL prevents other threads from executing Python bytecode concurrently, multi-threaded performance was similar to single-threaded performance, and actually slower due to the overhead of context switching between threads.
Multi-threading to Multi-processing
If concurrency with multiple threads gives you trouble, let multiple processes set you free.
Given the dismal performance of multi-threading in CPU-bound tasks, it makes sense to use multiprocessing or concurrent.futures.ProcessPoolExecutor to leverage processes and execute tasks concurrently across multiple cores, despite the higher computational cost.
For example, if we were to repeat the setup using multiprocessing and run the task across 4 processes instead:
We noticed that the time taken is now one-fourth of the time required in either single-threaded or multi-threaded execution.
Processes are still preferable for CPU-bound tasks that require true parallelism, despite the complexity of communication between processes due to their isolated memory.
While the example above demonstrates how we can use multiple processes to speed up a task, one issue not highlighted is that when we delegate tasks to separate threads or processes, we do not explicitly call the function directly, and thus, we are unable to return values from the tasks.
However, with multiprocessing, we can use queues to store results from tasks completed by different processes and then retrieve those values in the main process that delegated the tasks.
An example would be coordinating multiple processes to perform 20 prime number checks:
# -*- coding: utf-8 -*-
import multiprocessing
import random
import time
from dataclasses import dataclass
from multiprocessing import queues
# will take some time
NUMBERS = [i for i in range(100_000_000)]
@dataclass
class PrimeResult:
n: int
prime: bool
elapsed: float
JobQueue = queues.SimpleQueue[int]
ResultQueue = queues.SimpleQueue[PrimeResult]
def is_prime(x):
if x < 2:
return False
else:
for n in range(2,x):
if x % n == 0:
return False
return True
def check(n: int) -> PrimeResult:
t0 = time.perf_counter()
res = is_prime(n)
return PrimeResult(n, res, time.perf_counter() - t0)
def worker(jobs: JobQueue, results: ResultQueue):
while n := jobs.get():
results.put(check(n))
results.put(PrimeResult(0, False, 0.0))
def start_jobs(procs: int, jobs: JobQueue, results: ResultQueue):
for _ in range(20):
r = random.randint(0, 100_000_000 - 1)
jobs.put(NUMBERS[r])
for _ in range(procs):
proc = multiprocessing.Process(target=worker, args=(jobs, results))
proc.start()
# add poison pills after starting all processes
for _ in range(procs):
jobs.put(0)
def report(procs: int, results: ResultQueue) -> int:
checked = 0
procs_done = 0
while procs_done < procs:
result = results.get()
if result.n == 0:
procs_done += 1
else:
checked += 1
label = "p" if result.prime else " "
print(f"{result.n:16} | {label} | {result.elapsed:.9f}")
return checked
def main():
procs = multiprocessing.cpu_count()
t0 = time.perf_counter()
jobs: JobQueue = multiprocessing.SimpleQueue()
results: ResultQueue = multiprocessing.SimpleQueue()
start_jobs(procs, jobs, results)
checked = report(procs, results)
elapsed = time.perf_counter() - t0
print(f"{checked} in {elapsed:.4f} seconds")
if name == "__main__":
main()
worker: A unit of task that accesses inter-process communication (IPC) mechanisms through the use of queues (one for available jobs and one for completed results).
start_jobs: Set the premise for the tasks, including the number of jobs for the process pool and using a poison pill to signal termination.
report: Monitor process behavior and track the number of completed jobs.
⚠️ Without any form of IPC, it is not possible to retrieve results from multiple processes because each process operates within its own isolated memory space.
Concurrency for Network I/O
When dealing with network I/O, concurrency is essential because we don’t want to idly wait for remote servers to send responses. While waiting for a response, the application can perform other tasks to maximize resource utilization.
Unlike CPU-bound tasks, multi-threading remains an appropriate model for handling I/O-bound tasks simultaneously.
For example, let’s attempt to retrieve the flag for each country (if available) from Flag Download:
sequential_flag_download.py
# -*- coding: utf-8 -*-
import time
from pathlib import Path
from typing import Callable, List
import httpx
DEST_DIR = Path("flags")
COUNTRIES = [
"Afghanistan",
"Albania",
"Algeria",
"Andorra",
"Angola",
"Antigua and Barbuda",
"Argentina",
"Armenia",
"Australia",
"Austria",
"Azerbaijan",
"Bahamas",
"Bahrain",
...
]
def save_image(image: bytes, filename: str):
if DEST_DIR.exists():
(DEST_DIR / filename).write_bytes(image)
def get_flags(name: str) -> bytes:
format_name = "_".join(name.split(" "))
url = BASE_URL.format(format_name)
resp = httpx.get(url, timeout=5, follow_redirects=True)
resp.raise_for_status()
return resp.content
def downloader(countries: List[str]):
for c in countries:
image = get_flags(c)
save_image(image, "-".join(c.split(" ")))
print(c, end=" ", flush=True)
return len(countries)
def main(downloader: Callable[[List[str]], int]):
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter()
count = downloader(COUNTRIES)
elapsed = time.perf_counter() - t0
print(f"\n{count} downloaded in {elapsed:.2f} seconds")
if name == "__main__":
main(downloader)
As you may notice, there’s nothing concurrent about the attempt above; we are simply iterating through a list of countries sequentially and saving the flags to a specified path. Consequently, the overall process takes a significant amount of time, as seen:
Concurrent Executors
However, life can be easier with concurrent.futures.Executors which encapsulate the pattern of creating multiple independent threads or processes and collecting results via a queue.
If you recall, the concept of queues was demonstrated in the multi-processing example, where a queue is used as an IPC mechanism to gather results from tasks completed by different processes. However, for threads, because they share the same memory space, communication and data sharing are much simpler. Nevertheless, careful management is required to avoid race conditions and ensure thread safety.
Similar to multi-processes, threads can collect results from workers using shared state variables or thread-safe queues. However, concurrent.futures simplifies this process by providing the ThreadPoolExecutor and ProcessPoolExecutor classes. These classes manage a pool of worker threads/processes and handle task distribution and result collection automatically, eliminating the need for manual setup and management.
To demonstrate, let’s convert the sequential process of downloading flags to use multiple threads instead:
concurrent_flag_download.py
# -*- coding: utf-8 -*-
from concurrent import futures
def worker(country):
image = get_flags(country)
save_image(image, "-".join(country.split(" ")))
print(country, end=" ", flush=True)
return country
def threadpool_downloader(countries: List[str]) -> int:
with futures.ThreadPoolExecutor() as executor:
res = executor.map(worker, countries)
return len(list(res))
def main(downloader: Callable[[List[str]], int]):
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter()
count = downloader(COUNTRIES)
elapsed = time.perf_counter() - t0
print(f"\n{count} downloaded in {elapsed:.2f} seconds")
if name == "__main__":
main(threadpool_downloader)
The only changes required are the introduction of a worker function and the use of ThreadPoolExecutor in the downloader function. The rest remains the same as in the sequential example.
By making minimal changes to the existing code, we can submit callables for execution in different threads, resulting in a performance increase:
>>> 191 downloaded in 9.62 seconds
Similarly, with concurrent.futures we can make use of multi-processing easily by using ProcessPoolExecutor instead:
def processpool_downloader(countries: List[str]) -> int:
with futures.ProcessPoolExecutor() as executor:
res = executor.map(worker, countries)
return len(list(res))
if name == "__main__":
main(processpool_downloader)
You can expect similar results:
>>> 191 downloaded in 10.22 seconds
You may notice that ProcessPoolExecutor is actually slower compared to ThreadPoolExecutor due to the overhead associated with starting and managing processes. This demonstrates the preference for ThreadPoolExecutor for I/O-bound tasks.
ℹ️The argument max_workers in either ThreadPoolExecutor or ProcessPoolExecutor defaults to None, which is computed as max_workers = min(32, os.cpu_count() + 4).
Understanding Futures
Futures as stated in Fluent Python are core components in concurrent.futures and asyncio. Simply put, a Future in either library represents a deferred computation that is pending completion.
In the official documentation as per Python 3.12.4, a Future “encapsulates” the asynchronous execution of a callable. A similar comparison to a Future would be a Promise in JavaScript.
ℹ️In asyncio, Futures can be found in asyncio.Future.
One key aspect of Future is that it should not be created manually but rather instantiated exclusively by the concurrency framework. This is because a Future represents a pending task that will eventually be executed, but it needs to be scheduled first — a responsibility of the concurrency framework. If you think about it, the status of each executed task should be managed by the concurrency framework, and application code should not interfere with changing the state. As such, we cannot control when the framework changes the state of the Future.
While executor.map handles Futures behind the scenes, we can use executor.submit to demonstrate the creation of a Future:
def threadpool_downloader(countries: List[str]) -> int:
tasks: List[futures.Future] = []
completed: int = 0
with futures.ThreadPoolExecutor() as executor:
for c in countries:
future = executor.submit(worker, country=c)
tasks.append(future)
print(f"scheduling future: {future}")
for future in futures.as_completed(tasks):
res: str = future.result()
print(f"Executed future: {res!r}")
completed += 1
return completed
You may try to re-run the entire downloading process, and this time you may notice the following outputs:
>>> scheduling future: <Future at 0x102df0700 state=running>
>>> scheduling future: <Future at 0x102df0a90 state=running>
>>> scheduling future: <Future at 0x102f0f370 state=running>
...
>>> Executed future: 'Armenia'
>>> Executed future: 'Barbados'
>>> Executed future: 'Bangladesh'
...
>>> 191 downloaded in 5.24 seconds
Based on our print statements, we understand that each call to executor.submit returns a Future object. Unlike executor.map, using executor.submit requires an additional for loop to explicitly retrieve the eventual results.
One further point to note about Future — calling its result method actually blocks the caller’s thread if the result is not ready. However, with concurrent.futures.as_completed, it does not block the thread as it is an iterator that yields each future as it completes.
Ease vs Flexibility
While executor.map is easy to use and produces the same results as using both executor.submit with concurrent.futures.as_completed, it is less flexible in dealing with different callables and arguments. executor.map is designed to run the same callable on different inputs to the same set of arguments. In contrast, executor.submit allows you to run different callables with different sets of arguments if needed.
To better illustrate the differences between executor.submit and executor.map, let’s consider how each method works:
executor_map_vs_submit.py
from concurrent import futures
def add(a: int, b: int) -> int:
return a + b
def square(a: int) -> int:
return a * a
def executor_map():
print("\nFutures from executor.map")
with futures.ThreadPoolExecutor() as executor:
# we cannot run `add` in the same threadpool
res = executor.map(square, [i for i in range(100)])
for r in res:
print(r)
def executor_submit():
print("\nFutures from executor.submit")
tasks = []
with futures.ThreadPoolExecutor() as executor:
for i in range(100):
add_future = executor.submit(add, a=i, b=i)
mul_future = executor.submit(square, a=i)
tasks.extend([add_future, mul_future])
for task in futures.as_completed(tasks):
print(task.result())
if name == "__main__":
executor_map()
executor_submit()
In general,
submit: Allows you to submit individual callable tasks to the executor. This means you can submit a variety of different tasks, each potentially performing a different function.
map: Used to apply the same callable to a collection of arguments, similar to the built-in map function. This is useful when you want to perform the same operation on multiple pieces of data concurrently.
Personally, it’s rather straightforward to opt for executor.map if you are dealing with a single function that you want to run concurrently. However, if additional logic comes into play under different conditions, then executor.submit would be the better choice.
Conclusion
Concurrency is crucial in modern programming, especially in an era where applications need to be highly accessible and scalable.
In Python, achieving concurrency can be challenging due to the Global Interpreter Lock (GIL), which can limit the performance benefits of multi-threading, particularly for CPU-bound tasks.
In this post, we explored how multi-threading might not always yield the expected performance gains due to the GIL’s limitations, especially for CPU-intensive operations. However, for I/O-bound tasks like downloading data from the internet, multi-threading can be highly effective.
Stay curious and keep learning! 🐍
ℹ️Many of the code chunks above are adapted from Fluent Python by Luciano Ramalho. I highly recommend you pick up a copy of the book as I find it extremely useful to understand Python on a deeper level.
Resources
https://kwahome.medium.com/concurrency-is-not-parallelism-a5451d1cde8d
© 2025 | Mohammed Alshehri. All rights reserved.
Comentarios