Как стать автором
Обновить

Easy concurrency with Python Shared Object

Время на прочтение23 мин
Количество просмотров8K

Project repository.
Year old article about general concepts of the project.


So you want to build a multitasking system using python? But you actually hesitate because you know you'll have to either use multitasking module, which is slow and/or somewhat inconvenient, or a more powerfull external tool like Redis or RabbitMQ or even large DBMS like MongoDB or PostgreSQL, which require some glue (i.e. very far from native python code) and apply their own restrictions on what you can do with your data. If you think «why do I need so much hassle if I just want to run few worker threads in python using the data structures I already have in my python program and using functions I've already written? I just want to run this code in threads! Oh, I wish there was no GIL in Python» — then welcome to the club.


Of course many of us can build from scratch a decent tool that would make use of multiple cores. However, having already existing working software (Pandas, Tensorflow, SciPy, etc) is always cheaper than any development of new software. But the status quo in CPython tells us one thing: you cannot remove GIL because everything is based on GIL. Although making shit into gold could require much work, the ability to alleviate the transition from slow single-threaded shit to a slow not-so-single-threaded gold-looking shit might be worth it, so you won't have to rewrite your whole system from scratch.



The original unattainable goal was to make a simple CPython program with ordinary-looking data structures and code lines magically work in parallel on multiple cores. Of course, it's easier to say than actually make it work — there might be valid reasons for why nobody managed to create anything similar in 30 years of CPython's existence. Over time it became apparent to me that implementation-defined nature of CPython with its infinite amount of quirks and ad-hoc solutions make it incredibly difficult to implement those imitations really seamlessly.


Multiprocessing comes really close to being native-looking and composable, but, damn, multiprocessing is slow and painful as death from cancer. And within multiprocessing library you can see some shy attempts to implement shared memory (e.g. multiprocessing.sharedctypes) — and indeed my project can be seen as a large overhaul and extension of sharedctypes. Actually, I was considering the use of sharedctypes in my very early prototypes but quickly realized sharedctypes is way too limiting and provides few own features, thus sharedctypes is kind of half-baked quick solution unless someone finds the right one.


So let's take a look at the current state of my attempt (codename PSO or Python Shared Objects) with the help of few examples. The immediate benifit of the project is the ability to share complex dynamic objects between processes in a totally symmetric fasion i.e. transfer modified object from parent process to child, from child to parent, from child to child. So far every other alternative for CPython either requires the data to be of some simple fixed shape (like multiprocessing.ShareableList) or requires the use of pickle to pass complex objects across process boundary on each mutation or read.


python3 examples/simple_workers.py


import sys
import pso
import subprocess

if len(sys.argv) != 3:
    # main process
    coord_name = pso.init()
    pso.root().requests = [{ 'idx': i, 'request': f'request {i}' }
        for i in range(10)]
    pso.root().responses = [None]*10
    workers = [subprocess.Popen([sys.executable, sys.argv[0], coord_name, str(req['idx'])])
        for req in pso.root().requests]
    statuses = [w.wait(5) for w in workers]
    for (idx, response) in pso.root().responses:
        print((idx, response))
else:
    # worker process
    pso.connect(sys.argv[1])
    myindex = int(sys.argv[2])
    myrequest = pso.root().requests[myindex]
    import time, random
    time.sleep(random.randrange(2)) # imitate some long work
    pso.root().responses[myindex] = \
        (myindex, f'response {myindex} for {myrequest["request"]}')

In this example you can see a basic usage of shared data structures: requests are passed from main process to workers by simple assignment to pso.root() (which is a dictionary-like object shared by all processes connected to the same coordinator), and responses are returned as a list of tuples by the same direct assignment. Printing (like many other features) of some shared structures (list and tuple) is not fully implemented so in this example they are copied and iterated in a native CPython way instead of simple print(). Immutable data is implicitly copied into native "str" and "int" objects, so they are not a big problem here.


Actually in this example modifications of data are performed by what is internally called a "transient transaction". Those bear some similarity to "read committed" isolation mode in SQL i.e. the the operation sees last committed changes made by other transactions. Although don't be fooled by analogy — the transaction happens on each small access operation, for example pso.root().responses[myindex] does two transient transactions: one for pso.root().responses attribute lookup and one for responses[myindex] indexing. As far as I know, PyPy STM implements this very kind of transactional access.


At present I haven't tried to abstract the worker launch and parameter passing (like multiprocessing does) so you are free to start workers any way you want. Actually, you can even use multiprocessing for that purpose. Except don't use fork from the main process (coordinator) because the current implementation runs two threads in the main process and fork is mostly incompatible with multithreading.


python3 -m pso examples/accounts.pso.py


import sys
import random
import pso
import subprocess

class Account(pso.ShmObject):
    def __init__(this, id, starting_value):
        this.id = id
        this.balance = starting_value

number_of_accounts = 200

if len(sys.argv) != 3 or sys.argv[1] != 'worker':
    # main process
    coord_name = pso.init()

    pso.root().worker_count = 3
    if len(sys.argv) >= 2:
        pso.root().worker_count = int(sys.argv[1])

    pso.root().accounts = {f'client{i}': Account(i, random.randrange(1000))
        for i in range(number_of_accounts)}
    original_values = [pso.root().accounts[f'client{i}'].balance
        for i in range(number_of_accounts)]
    original_sum = sum(original_values)
    print(f'Original sum: {original_sum}')
    print(f'{original_values}')

    print(f'Launching account processing with {pso.root().worker_count} workers')
    workers = [subprocess.Popen([sys.executable, '-m', 'pso', sys.argv[0], 'worker', coord_name])
        for i in range(pso.root().worker_count)]
    statuses = [w.wait(None) for w in workers]

    new_values = [pso.root().accounts[f'client{i}'].balance
        for i in range(number_of_accounts)]
    new_sum = sum(new_values)
    print(f'New sum: {new_sum}')
    print(f'{new_values}')
    if new_sum != original_sum:
        raise Exception('Sums do not match')
else:
    # worker process
    pso.connect(sys.argv[2])
    accounts = pso.root().accounts
    for i in range(100*1000 // pso.root().worker_count):
        source_index = random.randrange(number_of_accounts)
        target_index = random.randrange(number_of_accounts)
        source = f'client{source_index}'
        target = f'client{target_index}'

        amount = random.randrange(1, 50)
        if source != target:
            with transaction:
                if amount <= accounts[source].balance:
                    accounts[source].balance -= amount
                    accounts[target].balance += amount

How it works: main process initializes several accounts with random balance values. Here accounts are of class Account(pso.ShmObject) — only subclasses of pso.ShmObject can be shared. Then multiple workers are started, each worker executes a long loop where he randomly picks two accounts, removes some amount of values from one account and puts them into another account in a transactional fasion i.e. either transaction succeedes and both accounts are updated or no updates are applied and transaction is restarted. So there you can see the new big features: explicit transactions, invocation with "pso" module for handling with transaction sugar, and shared class instances. Yes, the solution you see right now requires some modifications and imposes some restrictions onto the code, but the cup is still half full.


If you try to remove with transaction you'll get "Sums do not match" exception, because that exchange would be performed in four transient transactions (actually six with precondition). You might find it to be similar to a regular "with lock", except under hood it handles fine-grained locks for you, so you won't be bothered by deadlocks or resource starvation or poor concurrency due to coarse grained locking — that kind of protection is scarse in out-of-box and easy-to-use solutions.


with transaction is an invalid CPython code, because there's no "transaction" context manager. Actually, that line does not stand for a context manager, but implements an infinite loop enclosing the block of code. It's so sad to realize that after 30 year of "development" CPython is still missing a convenient way to pass a block of code into a function. Instead of a single real solution now we have multiple small ad-hoc solutions: lambdas, mostly useless until the recent introduction of walrus operator, list comprehension where a regular loop would suffice, context managers and function decorators to ultimately placate any pursuit to implementa and explicit in-place passing of code blocks as function arguments. Of course, you can still do something like:


    def code_block(amount, accounts):
        if amount <= accounts[source].balance:
            accounts[source].balance -= amount
            accounts[target].balance += amount

    pso.transaction(code_block, amount, accounts)

But let's face it — this construct is significantly harder to read and write than a simple with transaction:. Especially when you need to add hundreds of these into existing code. That's why the "pso" loader is required to interpret with transaction: in the main module. The "pso" module also installs hooks to automatically translate other modules named "*.pso.py", however for main module you will need to use the "-m pso" option.


Now that you know it's actually a loop, you'll understand why actions on non-shared data are supposed to be either pure (free of side-effects i.e. do not modify non-local data) or idempotent (like singleton). For shared data the implementation handles commits and rollbacks automatically. And that's why you cannot just put the transaction block around the whole code — that would mean running it atomically, most likely causing the remaining workers to wait. It's interesting to note that even such poorly designed transaction will work i.e. it will not hang indefinitely on long transactions with very high contention like naive implementations of STM do (notably the one built into GHC). In my implementation of STM I tried to handle such cases as if execution is performed serially using a single global lock like GIL. So if anything goes wrong — you'll get your GIL back.


Small note on class sharing. As already said, you can only share instances of pso.ShmObject. Class definitions are private to each process, that's why they should always remain static. Current implementation uses simple "module.classname" naming to find an appropriate class when passing an object from one process to another, therefore nested and dynamically created classes are not supported (yet).


Also I plan on removing the pso.root() mantra for *.pso.py modules, replacing it with regular global varibale access syntax, but it's going to require more complex metaprogramming.


It's time for some benchmarks. Here I have a similar program implemented using regular CPython:


python3 examples/accounts_serial.py
import random

class Account:
    def __init__(this, id, starting_value):
        this.id = id
        this.balance = starting_value

number_of_accounts = 200

accounts = {f'client{i}': Account(i, random.randrange(1000))
    for i in range(number_of_accounts)}
original_values = [client.balance for client in accounts.values()]
original_sum = sum(original_values)
print(f'Original sum: {original_sum}')
print(f'{original_values}')

for i in range(100*1000):
    source_index = random.randrange(number_of_accounts)
    target_index = random.randrange(number_of_accounts)
    source = f'client{source_index}'
    target = f'client{target_index}'

    amount = random.randrange(1, 50)
    if source != target:
        if amount <= accounts[source].balance:
            accounts[source].balance -= amount
            accounts[target].balance += amount

new_values = [client.balance for client in accounts.values()]
new_sum = sum(original_values)
print(f'New sum: {original_sum}')
print(f'{new_values}')

if new_sum != original_sum:
    raise Exception('Sums do not match')

and implemented using multiprocessing:


python3 examples/accounts_multiprocessing.py
import sys
import random
from multiprocessing import Process, Manager, Lock

class Account():
    def __init__(this, id, starting_value):
        this.id = id
        this.balance = starting_value

def worker(accounts, locks, worker_count):
    local_locks = dict(locks)
    for i in range(100*1000 // worker_count):
        source_index = random.randrange(number_of_accounts)
        target_index = random.randrange(number_of_accounts)
        source = f'client{source_index}'
        target = f'client{target_index}'
        amount = random.randrange(1, 50)
        if source != target:
            if worker_count == 1:
                if amount <= accounts[source].balance:
                    accounts[source].balance -= amount
                    accounts[target].balance += amount
            else:
                # acquire locks from lowest index to highest to avoid deadlocks
                if source_index <= target_index:
                    lock1, lock2 = local_locks[source], local_locks[target]
                else:
                    lock1, lock2 = local_locks[target], local_locks[source]

                if not lock1.acquire():
                    raise Exception(f'failed to get lock {source}-{target}')
                try:
                    if not lock2.acquire():
                        raise Exception(f'failed to get lock {source}-{target}')
                    try:
                        if amount <= accounts[source].balance:
                            accounts[source].balance -= amount
                            accounts[target].balance += amount
                    finally:
                        lock2.release()
                finally:
                    lock1.release()

worker_count = 3

number_of_accounts = 200

if __name__ == '__main__':
    if len(sys.argv) >= 2:
        worker_count = int(sys.argv[1])

    manager = Manager()
    accounts = manager.dict({f'client{i}': Account(i, random.randrange(1000))
        for i in range(number_of_accounts)})
    locks = manager.dict({f'client{i}': manager.Lock()
        for i in range(number_of_accounts)})

    original_values = [accounts[f'client{i}'].balance
        for i in range(number_of_accounts)]
    original_sum = sum(original_values)
    print(f'Original sum: {original_sum}')
    print(f'{original_values}')

    print(f'Launching account processing with {worker_count} workers')
    workers = [Process(target=worker, args=(accounts, locks, worker_count))
        for i in range(worker_count)]
    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()

    new_values = [accounts[f'client{i}'].balance
        for i in range(number_of_accounts)]
    new_sum = sum(new_values)
    print(f'New sum: {new_sum}')
    print(f'{new_values}')

Benchmarks were performed on VirtualBox Debian VM running on 4-core i5-6600, so it might be significantly slower than bare-metal.


Results

Regular CPython:

Command being timed: "python3 examples/accounts_serial.py"
User time (seconds): 0.91
System time (seconds): 0.00
Percent of CPU this job got: 99%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:00.93

PSO, single worker:

Command being timed: "python3 -m pso examples/accounts.pso.py 1"
User time (seconds): 3.83
System time (seconds): 0.17
Percent of CPU this job got: 98%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:04.07

PSO, 3 workers:

Command being timed: "python3 -m pso examples/accounts.pso.py 3"
User time (seconds): 4.68
System time (seconds): 0.25
Percent of CPU this job got: 270%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.82

multiprocessing, single worker (without locks):

Command being timed: "python3 examples/accounts_multiprocessing.py 1"
User time (seconds): 39.36
System time (seconds): 12.80
Percent of CPU this job got: 83%
Elapsed (wall clock) time (h:mm:ss or m:ss): 1:02.20

multiprocessing, 3 workers:
Command being timed: "python3 examples/accounts_multiprocessing.py 3"
User time (seconds): 65.16
System time (seconds): 46.71
Percent of CPU this job got: 149%
Elapsed (wall clock) time (h:mm:ss or m:ss): 1:14.67

Unfortunately, naive multiprocessing solution is so incredibly slow I had to optimize it a little: since lock's proxy object is immutable, I saved the locks array into a separate private storage for each worker. Same trick cannot be done for Account instances because mutations on those objects will become invisible to other processes.


As you see, multiprocessing fails to provide any concurrency. And it's very slow even for single worker. Of course, you might implement it using RawValue/RawArray and sharedctypes, or use some other kind of ad-hoc optimization and come close to PSO, but you are going to lose all the conveniency multiprocessing provides. In the end you can implement it in a pure C and beat even regular CPython version. But in that case you would not be starting to write python code in the first place.


Countrary, out of box PSO provides decent concurrency with reasonable footprint. Actually, even if you raise the contention rate by lowering the number of accounts from 200 to 20, you will get same results:


Same results
Command being timed: «python3 examples/accounts_20.pso.py 3»
User time (seconds): 4.80
System time (seconds): 0.23
Percent of CPU this job got: 279%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.80

Despite this fact, PSO single-worker version is still 4-5 times slower than regular CPython. Ensuing optimizations might bring those numbers down to "2-3 times slower", but anyway shared transactional memory is not completely free and you just cannot outplay ad-hoc optimizations being introduced into CPython for the last 30 years. For that reason you should try to use private python objects for relatively compute-intensive tasks (and C/C++ extensions for the most compute-extensive tasks) and reserve shared objects for interprocess communication.


The last (but not least) example shows some non-native-looking features of PSO, introduced specifically for concurrency:


python3 examples/producer_consumer.py


import sys
import pso
import subprocess
import time

class Queue(pso.ShmObject):
    termination_mark = False
    def __init__(this):
        this.deque = pso.ShmList()
        this.promise = pso.ShmPromise()

    @pso.transaction
    def put(this, value):
        this.deque.append(value)
        this.promise.signal(True)

    def pop(this):
        rslt = this.deque.popleft()
        while rslt is None:
            this.promise.wait(0) # cannot be waited inside transaction
            this.promise = pso.ShmPromise()
            rslt = this.deque.popleft()

        if rslt is this.termination_mark:
            return (None, True)
        else:
            return (rslt, False)

    def terminate(this):
        this.terminated = True
        this.promise.signal(True)

number_of_accounts = 200

if len(sys.argv) != 3 or sys.argv[1] != 'worker':
    # main process
    coord_name = pso.init()

    pso.root().queue = queue = Queue()

    worker_count = 3
    if len(sys.argv) >= 2:
        worker_count = int(sys.argv[1])
    workers = [subprocess.Popen([sys.executable, sys.argv[0], 'worker', coord_name])
        for i in range(worker_count)]

    items = []
    ends = 0
    while ends < worker_count:
        (item, eoq) = queue.pop()
        if eoq:
            ends += 1
        print(item)

    print(f'ShmList final state: {list(queue.deque)}')
    statuses = [w.wait(None) for w in workers]
    input("Press Enter to continue...")
else:
    # worker process
    pso.connect(sys.argv[2])
    queue = pso.root().queue
    for i in range(100):
        queue.put(i)
    queue.put(queue.termination_mark)

This is just a simple "multiple concurrent consumers — single consumer" application. I might consider writing some building a support for queue-channel functionality right into my library. But currently I want to amphasize this is a producer-consumer application written using basic data structures for most of the part, no special libraries or other external software is required.


So the new features are ShmList.popleft() method and ShmPromise class. The "popleft" implements a kind of deque interface, not nearly as feature-rich as collections.deque though. Thus you can use Shmlist as a queue, pushing elements with append() and popping them with popleft().


ShmPromise is the only built-in synchronization primitive at the moment. It is created in unsignalled state which causes all wait() calls on it to block. When signalled by signal(), it resumes execution of all the waiters and causes all subsequent wait() calls to return immediately. That is a one-shot event like JavaScript's promise. It's simple and lots of complex scenarious can be implemented with it, but each instance of Promise can only be used for a single signal, so for multiple signals you will need to create multiple Promise-s. Promise signal() can also participate in transaction (signal atomically after transaction succeedes), but wait() should always be called outside of transaction, because you are not supposed to wait for anything inside transaction anyway and implementation of dependant transactions would make already hard project much harder.


Also you might have noticed I used @pso.transaction decorator instead of with transaction because the "put" method can be passed as argument just fine. And there's no "-m pso" option, which does nothing for this particular program.


Technical details of implementation (boring part)


Python program expects sequential execution. Easiest way CPython's developers have chosen — just execute the program totally sequentially (a.k.a. GIL) and pretend there is no problem. However, this very approach creates a new problem — time passed and now you cannot get rid of GIL because the whole compiler, standard library, and all third-party code heavily relies on GIL, so the problem get bigger and bigger with every year. The problem is so neglected the community cannot even implement subinterpreters:


https://www.python.org/dev/peps/pep-0554/ — PEP 554 — Multiple Interpreters in the Stdlib
https://github.com/ericsnowcurrently/multi-core-python


It's been like 2 year since I started my project on shared objects, and subinterpreters are still work-in-progress. Even if completed, subinterpreters only provide you with separated interpreters each thinking it's the only one and running commands one by one (in a total serial order).


Designers of multiinterpreters suggest many ways of communication except they don't suggest shared memory. Of course you might have heard about: https://golang.org/doc/effective_go#sharing

Do not communicate by sharing memory; instead, share memory by communicating
However, system made of independant tasks communicating over channels can be hard to reason about despite the absence of races. Shared data, in turn, has the following drawbacks:
  • races, mentioned in the Go article. So far the biggest barrier against mass use of multithreading. My project mostly solves (with remarks) this problem by introduction of transparent transactions as mentioned earlier;
  • cache misses. Modern CPUs are optimized for caching core-private data and much less optimized for shared data, especially in case of NUMA systems. But actually for CPython this aspect is less significant because run time of a single command is comparable to a cache miss delay;
  • crash recovery. I doubt a Go program can continue running after something like segmentation fault. So far very few tool solved this problem successfully, notably Erlang. I tried to introduce some redundancy and recovery features, so latter it would be possible to implement recovery from a single worker crash which presently might lead to a complete halt of processing due to abandonned locks.

Our best bet for multatasking with shared data is to find some way to make it look like a sequential execution of actions on private data while in fact actions are performed simultaneously on shared data. So far the only practical way to do that is to employ transactional access with fine-grained locking. Even more «fine-grained» is a read-write locking, albeit tedious in implementation, but the benefits of being able to perform multiple concurrent reads are just too huge to ignore.


However, no matter if you are using optimistic or pessimistic locking, with fine-grained locking you eventually run into a need to roll back the transaction e.g. when you run into a deadlock scenario. So here is the second requirement for the system: ability to keep at least two versions of data. Yeah, I know there are multiversioned/persistent data structures, but those are not free and Python is just not suited for managing snapshots anyways.


The third requirement comes from the fact operations in Python might take an extended period of time. And it's okay for Python, but many concurrent algorythms assume the «critical» (concurrently run) segment of code is as short as possible, so there probably won't be too much contention. We cannot force people to rewrite their program, so we might expect the concurrent tasks to eventually all wait and contend for the same resource, with some or all of the transactions being long-running. Simple optimistic (lazy) STM implementations would provide horrible performance in that scenario (like mentioned GHC), even worse than fully sequential execution with GIL. Naive locking would favor short transactions and possibly never complete long ones. That's why fairness is a must.



So, the project seems to be pretty simple at first glance. That's what I thought at first when starting the project. Over time several things became apparent:


  • there are no full-fledged open source implementations of shared memory manager. There are some building block available here and there, but they all require lots of work to make a ready-to-use solution. Partially that's because there are no agreed unified ways of pointer translation between processes (shared memory regions reside at different addresses in different processes);
  • no ready-made solutions for debugging and testing of multitasking systems. There are lots of tools for single-threaded programs, there are some tools for multithreaded programs, and there are kinda none for multiprocess ones.
  • although there are lots of implementation for read-write locks, the requirement for fairness-cancelability on multiple locks brings it to a completely different level. To implement fairness we need to be able to cancel newer and/or shorter transaction so older/longer transactions will be able to successfully complete. Doing so with set of multiple locks means that acquisition of one additional lock in one transaction might require release of the whole set of locks acquired previously by other transaction or that one lock would wait for the whole set of locks to be released.
  • reclaiming memory in concurrent environment is a pain. Lock-based reference counting has some horrible performance, just like hazard pointers. Tracing garbage collection is a good candidate, but implementing it efficiently is hard and the need to stop every worker even for brief duration actually means indeterminant wait time because we have very few wait points. So I created a significantly extended version of quiescence state reclamation which is also used by Linux kernel. Epoch-based reclamation (similar to quiescence state-based) requires all tasks to make progress for reclamation to happen, which is not viable for the same reason as stop-the-world garbage collectors — CPython program may pause for indetermined period of time.

Shared memory manager


It was created with Hoard as a prototype, with separate heaps for each worker process — an underutilized strategy which leads to higher memory footprint but provides superior performance, that's why it is used by all efficient multithreaded memory managers like Hoard, jemalloc, tcmalloc, mimalloc. The memory manager is still not production ready though, because it supports no large memory blocks and cannot release completely free pages of memory to OS.


The problem of pointer translation was resolved using a kind of page table, so the higher bits of pointer indicate a shared segment index (page index) and lower bits indicate offset from the start of that page. Thus at very low footprint of page offset lookup I got sharable pointers that work in every process. Although it's not Byzandine fault-resistant like hash table lookups often used by OS, we don't except the processes to be malicious anyways.


Two versioned data structures


The transactional access requires ability to cancel mutation and to not show it until committed. So at least two versions of data should be available: publicly visible committed and private uncommitted. Any library capable of providing such data structures and able to reside in custom shared memroy buffer can be utilized for extension of PSO. For example, Numpy arrays support custom buffers, thus to become usable in PSO it only requires additional work for uncommitted data storage.


Currently there are only three mutable data structures available:
— ShmList, list (with queue-like extension) implemented using two-level tree;
— ShmDict, unordered assiciative array (dict) implemented as two hash tables, each hash table being a two-level tree;
— ShmPromise, which is very simple and just stores two statuses.


The two-level array resembles persistent data structures and originally was intended for dirty reads (which are not supported at the moment). Also it's pretty much the only way to efficiently implement a mixed queue-vector functionality, so it came in handy.


ShmObject is represented by the same ShmDict data type, with shared dict taking place of object's dict attribute, thus every access to the object's attribute is translated into ShmDict key lookup. Sharing of classes is not implemented, however you might try to create a class with pso.ShmObject as metaclass (not guaranteed to work though, just a crazy thought).


Fine-grained fair cancelable locks


The most unbridled part of the project. That's where most of lock-free code is located (obviously, because that's implementation of locks). RW-locking algorythm was inspired by NUMA-Aware Reader-Writer Locks (shoutout to Nir Shavit, my favorite author on concurrency topics). However, those are just regular not-so-fair RW locks and already you might notice how complex the implementation is. It took so much more effort to implement strict fairness and ability to have unlimited amount of locks without a deadlock risk. Unfortunately, I cannot simply use queue-based locks (also called hierarchical locks) like guys from Linux kernel did, because cancelation is really hard task for queue-based locks.


My key idea was to always have some global unambiguous priority of tasks so on contention one task continues execution and remaining tasks wait, thus no deadlocks are possible. Each task takes a ticket from global atomic counter, older ticket means higher priority, newer ticket means lower priority. You might find it similar to a ticket lock, but these are global tickets for every lock in the system. No matter who and where detects the contention, one task gives up the lock or is preempted, and the other task keeps running or preempts the lower priority task and waits. So it trades throughput for a better fairness — opposite to what the guys from Oracle and Nir Shavit did. This mechanism can be roughly described as:


take_read_lock


    if preempted then
        return Preempted;
    if check_exclusive() then
        return Ok;
    if !check_priority_vs_writer_and_barrier() then
        return Preempted;
    set_read_lock();
    if preempt_contending_writer()
        return Wait;
    return Ok;

take_write_lock


    if preempted then
        return Preempted;
    if check_exclusive() then
        return Ok;
    if higher_priority_reader_or_writer_exists() then
        return Preempted;
    set_barrier();
    if preempt_low_priority_readers_and_writers() then
        return Wait;
    if !set_write_lock() then
        return Repeat;
    clear_barrier();
    return Ok;

And the whole execution is enclosed into infinite loop:


while True do
    result = take_read_lock/take_write_lock();
    if result == Ok then
        break;
    if result == Preempted then
        abort_transaction();
    else if result == Wait then
        wait();
    USE_THE_DATA;
    ...
    PROFIT

Here the barrier is an important link to avoid possible reader-writer misunderstanding. The ability to run multiple readers simultaneously requires the writer to wait for high priority readers but preempt low priority ones. And this should happen before writer gets its lock (it cannot because there are higher priority readers still running) — so that's what the barrier is for: to block lower priority readers but wait for higher priority readers to finish.


This locking mechanism has one important property: readers don't have to check other readers' lock state, so the reader locks can be implemented with so-called cohorting where each cohort of readers on one processor accesses only its cohort-bound set of reading locks, while the only cross-processor interaction happens between at least one writer, so for mostly-reading tasks it is possible to minimize cache misses because those readers would only check cohort-bound reader-locks and a cached unmodified writer lock. Thus the algorythm can easily become NUMA-friendly.


Memory reclamation


As already mentioned, reclamation of memory in multithreaded environment is challenging. There are only two viable mechanisms: fully concurrent tracing garbage collector, which can be very hard to implement, or quiescence state reclamation. Both require some background worker to do the reclamation. Classic concurrent tracing garbage collection allows us to drop reference counting which has significant impact on performance of low-level code and especially shared memory due to cache misses.


However, the only way I see to implement a truly concurrent garbage collector with zero stops is to utilize a reference counting to catch references modification without stopping the whole processing, thus the whole "cache optimization advantage due to no reference counting" thing is actually non-existent for this project.


So I picked the quiescence state idea. Classic RCU (Linux) append the released objects into a lock-free queue, latter reclaimer grabs this queue using atomic swap, and then triggers reclamation onces all workers finish their work (enter quiescence state). However, RCU implementation relies on the few assumptions: object is never reused, not referenced from multiple containers, and references are never modified outside transaction. Those are way too restrictive, we cannot force the user to not reuse an object. To trigger reclamation on a free-floating object we need to count its references, only reclaiming objects with zero reference counter.


I went one step further: in anticipation of possible future implementation of dirty read (or some other mechanism for simultaneous reading and writing) and possible optimizations for reduction of reference count mutation (to ret rid of temporary increments/decrements hurting CPU cache) I allowed the reference counter to temporary become zero during transaction. Of course, that would require cancelation of reclamation request for that object, which is easiest accomplished using a second counter (called "revival_count" internally). Thus it is possible to just read the shared objects without touching reference counter and without acquiring any reader/writer locks — those objects will not go away for as long as the transaction is running.


Debugging and testing


Tests cannot be used to prove absence of bugs. Tests only show presence of bugs, but successfull pass of a test means "this bug is not here". Some other bug might still be here though. You can deal with this problem by introducing more tests, and more, and more, until there's a lot more development time spent on test than on actual code. And it's so much easier to hide subtle bugs in multitasking code, because a crash might be reproduced one time in 1000 runs. If I was an "enterprise developer" I would cover this problem and say "it's gone now, I fixed it". Obviously, you cannot do that when you write your code for free. So I took a different approach: if anything can crash, if anywhere an error might be detected — let's detect it and crash right now (through assertion), without waiting for data corruption or other unpredictable behaviour to happen (or test to successfully pass). It's kind of approach designers of Rust language took. Unfortunately, neither me nor Rust designers were able to enforce static (compile-time) checks on shared mutable state and lock-free algorythms.


To be able to check everything, every object has a type tag, many data structures are redundant and can be verified with relatively small footprint. My benchmarks show that removal of all the assertions gives 10-15% increase in performance. Also, lock-free algorythms often have rarely hit sequences of actions (not to be confused with code branches), so the second trick is to occasionally trigger unpopular code branches hoping to trigger some rare races. This testing "from inside" instead of testing "from outside" (with classic tests) leads to a radical decrease of bug count due to their early detection when I can trace a recent modification related to this bug instead of capturing it few month latter. Of course, you cannot test a code "from inside" if it's not being executed at all, that's why a joint effort from both directions is required to sucessfully produce a working software.


Actually, these "assertions" try to pause all the process connected to the same coordinator through kill(SIGSTOP) or PauseThread(). That's because common debuggers are useless for multiprocess systems, because they pause one process but let the others run, thus potentially changing errorneous shared data. I'm yet to find a better way to debug this software.


Unfortunately, there is no ThreadSanitizer for multiprocess systems. And even worse — there don't exist non-disturbing logging tools for concurrent systems. Usually a logging software requires at least one system call to be performed to register an action, which becomes an unintentional synchronization point. However, for race debugging just few CPU cycles might mean a lot, like a difference between occasionally reproducible bug and never reproducible. I'm really thinking about writing a non-disturbing logger myself.


Last notes and disclaimer


This is not a production-ready software. Although it's already quite large, there are still many thing to do: lots of standard CPython containers' functions are not implemenented, only objects of special base class can be shared and their classes are required to be static and immutable, only Windows and Linux platforms are supported, it's limited to 63 workers, there are some shortcuts limiting mutation count in one transaction for a single container and limiting size of god knows what things (sorry, writing dynamic structures take much more effort), it lacks cyclic garbage collector for shared data, memory manager cannot release memory to OS (but is able to reclaim memory within already allocated pages), it requires testing and it surely contains many bugs. It was tested on CPython 3.7 and 3.8 only. And it lacks end-user documentation, although "end-user" will surely run into another bug anyway, so end-user should just follow trial and error method. Don't expect it to be usable in production right now, so don't blame me for any lost dogs or broken hearts caused by this software.

Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+1
Комментарии3

Публикации

Истории

Работа

Python разработчик
135 вакансий
Data Scientist
62 вакансии

Ближайшие события