Twisted — это фреймворк на Python для разработки сетевых приложений, который среди многих других применений, может быть использован и для параллельной обработки данных — мультипроцессности. Это замечательно, но мне пришлось попотеть для того, чтобы найти то, что мне нужно.
Я листал документацию Twisted и книгу O'Reilly Twisted. Существует также рецепт в Python Cookbook. Однако, самое интересное я нашел в статье Брюса Эккель — Параллельность с Python, Twisted и Flex. Также стоит прочитать первоначальные статьи Брюса Эккель про Twisted: Grokking Twisted.
Вот мои замечания о текущем примере Брюса.
Я убрал Flex — отчасти потому, что мне это не нужно и я ничего не хочу знать об этом. В примере запускается контроллер, который инициализирует ряд отдельных параллельных процессов-вычислителей, в которых уже запускаются какие-то сложные действия (эти процессы называют solvers). Также тут имеется взаимодействие между контроллером и вычислителями. Хотя этот пример запускается только на одной машине, те принципы, о которых говориться в статье — не трудно распространить и на систему из нескольких компьютеров.
Для хорошего примера, как это работает, пожалуйста, смотрите оригинал статьи .
Вот solver.py который скопирован с оригинала. Настоящая «работа» происходит в методе step(). Я только добавил некоторую отладочную информацию для себя.
Вот controller.py. Он также скопирован из оригинальной статьи, но я убрал Flex и создал сигналы start и terminate в классе контроллера. Я не уверен, что это имеет смысл, но, по крайней мере, это позволило мне нормально использовать пример. Я также перенес метод terminate из FlexInterface в Controller.
Чтобы запустить программу, положите оба файла в одну папку и запустите
Вы должны увидеть, как загрузка двух процессоров (если их, конечно, у вас — 2 ;-) ) поднимется до 100%. А вот и вывод скрипта на экран:
Оригинал
Я листал документацию Twisted и книгу O'Reilly Twisted. Существует также рецепт в Python Cookbook. Однако, самое интересное я нашел в статье Брюса Эккель — Параллельность с Python, Twisted и Flex. Также стоит прочитать первоначальные статьи Брюса Эккель про Twisted: Grokking Twisted.
Вот мои замечания о текущем примере Брюса.
Я убрал Flex — отчасти потому, что мне это не нужно и я ничего не хочу знать об этом. В примере запускается контроллер, который инициализирует ряд отдельных параллельных процессов-вычислителей, в которых уже запускаются какие-то сложные действия (эти процессы называют solvers). Также тут имеется взаимодействие между контроллером и вычислителями. Хотя этот пример запускается только на одной машине, те принципы, о которых говориться в статье — не трудно распространить и на систему из нескольких компьютеров.
Для хорошего примера, как это работает, пожалуйста, смотрите оригинал статьи .
Вот solver.py который скопирован с оригинала. Настоящая «работа» происходит в методе step(). Я только добавил некоторую отладочную информацию для себя.
""" solver.py Original version by Bruce Eckel Solves one portion of a problem, in a separate process on a separate CPU """ import sys, random, math from twisted.spread import pb from twisted.internet import reactor class Solver(pb.Root): def __init__(self, id): print "solver.py %s: solver init" % id self.id = id def __str__(self): # String representation return "Solver %s" % self.id def remote_initialize(self, initArg): return "%s initialized" % self def step(self, arg): print "solver.py %s: solver step" % self.id "Simulate work and return result" result = 0 for i in range(random.randint(1000000, 3000000)): angle = math.radians(random.randint(0, 45)) result += math.tanh(angle)/math.cosh(angle) return "%s, %s, result: %.2f" % (self, str(arg), result) # Alias methods, for demonstration version: remote_step1 = step remote_step2 = step remote_step3 = step def remote_status(self): print "solver.py %s: remote_status" % self.id return "%s operational" % self def remote_terminate(self): print "solver.py %s: remote_terminate" % self.id reactor.callLater(0.5, reactor.stop) return "%s terminating..." % self if __name__ == "__main__": port = int(sys.argv[1]) reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1]))) reactor.run()
Вот controller.py. Он также скопирован из оригинальной статьи, но я убрал Flex и создал сигналы start и terminate в классе контроллера. Я не уверен, что это имеет смысл, но, по крайней мере, это позволило мне нормально использовать пример. Я также перенес метод terminate из FlexInterface в Controller.
""" Controller.py Original version by Bruce Eckel Starts and manages solvers in separate processes for parallel processing. """ import sys from subprocess import Popen from twisted.spread import pb from twisted.internet import reactor, defer START_PORT = 5566 MAX_PROCESSES = 2 class Controller(object): def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage): print "controller.py: broadcasting..." deferreds = [solver.callRemote(remoteMethodName, arguments) for solver in self.solvers.values()] print "controller.py: broadcasted" reactor.callLater(3, self.checkStatus) defer.DeferredList(deferreds, consumeErrors=True).addCallbacks( nextStep, self.failed, errbackArgs=(failureMessage)) def checkStatus(self): print "controller.py: checkStatus" for solver in self.solvers.values(): solver.callRemote("status").addCallbacks( lambda r: sys.stdout.write(r + "\n"), self.failed, errbackArgs=("Status Check Failed")) def failed(self, results, failureMessage="Call Failed"): print "controller.py: failed" for (success, returnValue), (address, port) in zip(results, self.solvers): if not success: raise Exception("address: %s port: %d %s" % (address, port, failureMessage)) def __init__(self): print "controller.py: init" self.solvers = dict.fromkeys( [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)]) self.pids = [Popen(["python", "solver.py", str(port)]).pid for ip, port in self.solvers] print "PIDS: ", self.pids self.connected = False reactor.callLater(1, self.connect) def connect(self): print "controller.py: connect" connections = [] for address, port in self.solvers: factory = pb.PBClientFactory() reactor.connectTCP(address, port, factory) connections.append(factory.getRootObject()) defer.DeferredList(connections, consumeErrors=True).addCallbacks( self.storeConnections, self.failed, errbackArgs=("Failed to Connect")) print "controller.py: starting parallel jobs" self.start() def storeConnections(self, results): print "controller.py: storeconnections" for (success, solver), (address, port) in zip(results, self.solvers): self.solvers[address, port] = solver print "controller.py: Connected; self.solvers:", self.solvers self.connected = True def start(self): "controller.py: Begin the solving process" if not self.connected: return reactor.callLater(0.5, self.start) self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1") def step2(self, results): print "controller.py: step 1 results:", results self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2") def step3(self, results): print "controller.py: step 2 results:", results self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3") def collectResults(self, results): print "controller.py: step 3 results:", results self.terminate() def terminate(self): print "controller.py: terminate" for solver in self.solvers.values(): solver.callRemote("terminate").addErrback(self.failed, "Termination Failed") reactor.callLater(1, reactor.stop) return "Terminating remote solvers" if __name__ == "__main__": controller = Controller() reactor.run()
Чтобы запустить программу, положите оба файла в одну папку и запустите
python controller.py
Вы должны увидеть, как загрузка двух процессоров (если их, конечно, у вас — 2 ;-) ) поднимется до 100%. А вот и вывод скрипта на экран:
controller.py: init PIDS: [12173, 12174] solver.py 5567: solver init solver.py 5566: solver init controller.py: connect controller.py: starting parallel jobs controller.py: storeconnections controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): } controller.py: broadcasting... controller.py: broadcasted solver.py 5566: solver step solver.py 5567: solver step controller.py: checkStatus solver.py 5566: remote_status Solver 5566 operational solver.py 5567: remote_status controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')] controller.py: broadcasting... controller.py: broadcasted Solver 5567 operational solver.py 5566: solver step solver.py 5567: solver step controller.py: checkStatus solver.py 5566: remote_status Solver 5566 operational solver.py 5567: remote_status controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')] controller.py: broadcasting... controller.py: broadcasted Solver 5567 operational solver.py 5566: solver step solver.py 5567: solver step controller.py: checkStatus solver.py 5566: remote_status Solver 5566 operational solver.py 5567: remote_status controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')] controller.py: terminate Solver 5567 operational solver.py 5566: remote_terminate solver.py 5567: remote_terminate
Оригинал