API Documentation
Workers Thread(s)
and Process(es)
.
spoc.BaseProcess(**kwargs)
Abstract Process
Example:
class AsyncProcess(spoc.BaseProcess):
agent: Any = asyncio # Example: `uvloop`
def before(self) -> None:
... # Set uvloop.EventLoopPolicy()
async def on_event(self, event_type: str):
...
async def server(self):
while self.active:
...
class SyncProcess(spoc.BaseProcess):
def on_event(self, event_type: str):
...
def server(self):
while self.active:
...
spoc.BaseThread(**kwargs)
Abstract Thread
Example:
class AsyncThread(spoc.BaseThread):
agent: Any = asyncio # Example: `uvloop`
def before(self) -> None:
... # Set uvloop.EventLoopPolicy()
async def on_event(self, event_type: str):
...
async def server(self):
while self.active:
...
class SyncThread(spoc.BaseThread):
def on_event(self, event_type: str):
...
def server(self):
while self.active:
...
spoc.BaseServer
Control multiple workers Thread(s)
and/or Process(es)
.
Example:
import time
class MyProcess(spoc.BaseProcess): # BaseThread
def on_event(self, event_type):
print("Process | Thread:", event_type)
def server(self):
while self.active:
print("My Server", self.options.name)
time.sleep(2)
class MyServer(spoc.BaseServer):
@classmethod # or staticmethod
def on_event(cls, event_type):
print("Server:", event_type)
# Press (CTRL + C) to Quit
MyServer.add(MyProcess(name="One"))
MyServer.add(MyProcess(name="Two"))
MyServer.start()
clear()
classmethod
Workers and PIDs cleanup
exit()
staticmethod
Exit main process
add(*workers)
classmethod
Add worker instances to the service.
start(infinite_loop=True, timeout=5, forced_delay=1)
classmethod
Start all added workers and optionally keep a loop running until interrupted.
stop(timeout=5, force_stop=False, forced_delay=1)
classmethod
Stop all running workers.
force_stop(delay=1)
classmethod
Force to stop.