Pull to refresh

Python и Twisted — Заметки о параллельной обработке данных (мультипроцессности)

Reading time 5 min
Views 33K
imageTwisted — это фреймворк на Python для разработки сетевых приложений, который среди многих других применений, может быть использован и для параллельной обработки данных — мультипроцессности. Это замечательно, но мне пришлось попотеть для того, чтобы найти то, что мне нужно.

Я листал документацию Twisted и книгу O'Reilly Twisted. Существует также рецепт в Python Cookbook. Однако, самое интересное я нашел в статье Брюса Эккель — Параллельность с Python, Twisted и Flex. Также стоит прочитать первоначальные статьи Брюса Эккель про Twisted: Grokking Twisted.

Вот мои замечания о текущем примере Брюса.

Я убрал Flex — отчасти потому, что мне это не нужно и я ничего не хочу знать об этом. В примере запускается контроллер, который инициализирует ряд отдельных параллельных процессов-вычислителей, в которых уже запускаются какие-то сложные действия (эти процессы называют solvers). Также тут имеется взаимодействие между контроллером и вычислителями. Хотя этот пример запускается только на одной машине, те принципы, о которых говориться в статье — не трудно распространить и на систему из нескольких компьютеров.

Для хорошего примера, как это работает, пожалуйста, смотрите оригинал статьи .

Вот solver.py который скопирован с оригинала. Настоящая «работа» происходит в методе step(). Я только добавил некоторую отладочную информацию для себя.
"""
solver.py
Original version by Bruce Eckel
Solves one portion of a problem, in a separate process on a separate CPU
"""
import sys, random, math
from twisted.spread import pb
from twisted.internet import reactor

class Solver(pb.Root):

    def __init__(self, id):
        print "solver.py %s: solver init" % id
        self.id = id

    def __str__(self): # String representation
        return "Solver %s" % self.id

    def remote_initialize(self, initArg):
        return "%s initialized" % self

    def step(self, arg):
        print "solver.py %s: solver step" % self.id
        "Simulate work and return result"
        result = 0
        for i in range(random.randint(1000000, 3000000)):
            angle = math.radians(random.randint(0, 45))
            result += math.tanh(angle)/math.cosh(angle)
        return "%s, %s, result: %.2f" % (self, str(arg), result)

    # Alias methods, for demonstration version:
    remote_step1 = step
    remote_step2 = step
    remote_step3 = step

    def remote_status(self):
        print "solver.py %s: remote_status" % self.id
        return "%s operational" % self

    def remote_terminate(self):
        print "solver.py %s: remote_terminate" % self.id
        reactor.callLater(0.5, reactor.stop)
        return "%s terminating..." % self

if __name__ == "__main__":
    port = int(sys.argv[1])
    reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1])))
    reactor.run()



Вот controller.py. Он также скопирован из оригинальной статьи, но я убрал Flex и создал сигналы start и terminate в классе контроллера. Я не уверен, что это имеет смысл, но, по крайней мере, это позволило мне нормально использовать пример. Я также перенес метод terminate из FlexInterface в Controller.
"""
Controller.py
Original version by Bruce Eckel
Starts and manages solvers in separate processes for parallel processing.
"""
import sys
from subprocess import Popen
from twisted.spread import pb
from twisted.internet import reactor, defer

START_PORT = 5566
MAX_PROCESSES = 2

class Controller(object):

    def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage):
        print "controller.py: broadcasting..."
        deferreds = [solver.callRemote(remoteMethodName, arguments) 
                     for solver in self.solvers.values()]
        print "controller.py: broadcasted"
        reactor.callLater(3, self.checkStatus)

        defer.DeferredList(deferreds, consumeErrors=True).addCallbacks(
            nextStep, self.failed, errbackArgs=(failureMessage))
    
    def checkStatus(self):
        print "controller.py: checkStatus"
        for solver in self.solvers.values():
            solver.callRemote("status").addCallbacks(
                lambda r: sys.stdout.write(r + "\n"), self.failed, 
                errbackArgs=("Status Check Failed"))
                                                     
    def failed(self, results, failureMessage="Call Failed"):
        print "controller.py: failed"
        for (success, returnValue), (address, port) in zip(results, self.solvers):
            if not success:
                raise Exception("address: %s port: %d %s" % (address, port, failureMessage))

    def __init__(self):
        print "controller.py: init"
        self.solvers = dict.fromkeys(
            [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)])
        self.pids = [Popen(["python", "solver.py", str(port)]).pid
                     for ip, port in self.solvers]
        print "PIDS: ", self.pids
        self.connected = False
        reactor.callLater(1, self.connect)

    def connect(self):
        print "controller.py: connect"
        connections = []
        for address, port in self.solvers:
            factory = pb.PBClientFactory()
            reactor.connectTCP(address, port, factory)
            connections.append(factory.getRootObject())
        defer.DeferredList(connections, consumeErrors=True).addCallbacks(
            self.storeConnections, self.failed, errbackArgs=("Failed to Connect"))

        print "controller.py: starting parallel jobs"
        self.start()

    def storeConnections(self, results):
        print "controller.py: storeconnections"
        for (success, solver), (address, port) in zip(results, self.solvers):
            self.solvers[address, port] = solver
        print "controller.py: Connected; self.solvers:", self.solvers
        self.connected = True

    def start(self):
        "controller.py: Begin the solving process"
        if not self.connected:
            return reactor.callLater(0.5, self.start)
        self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1")

    def step2(self, results):
        print "controller.py: step 1 results:", results
        self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2")

    def step3(self, results):
        print "controller.py: step 2 results:", results
        self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3")

    def collectResults(self, results):
        print "controller.py: step 3 results:", results
        self.terminate()
        
    def terminate(self):
        print "controller.py: terminate"
        for solver in self.solvers.values():
            solver.callRemote("terminate").addErrback(self.failed, "Termination Failed")
        reactor.callLater(1, reactor.stop)
        return "Terminating remote solvers"

if __name__ == "__main__":
    controller = Controller()
    reactor.run()



Чтобы запустить программу, положите оба файла в одну папку и запустите
python controller.py


Вы должны увидеть, как загрузка двух процессоров (если их, конечно, у вас — 2 ;-) ) поднимется до 100%. А вот и вывод скрипта на экран:
controller.py: init
PIDS:  [12173, 12174]
solver.py 5567: solver init
solver.py 5566: solver init
controller.py: connect
controller.py: starting parallel jobs
controller.py: storeconnections
controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): }
controller.py: broadcasting...
controller.py: broadcasted
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
controller.py: broadcasting...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
controller.py: broadcasting...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
controller.py: terminate
Solver 5567 operational
solver.py 5566: remote_terminate
solver.py 5567: remote_terminate




Оригинал
Tags:
Hubs:
+26
Comments 10
Comments Comments 10

Articles