Skip to content

ZeroMQ API Documentation


ZeroMQ Data


zmag.Data(meta=dict(), head=dict(), body=dict()) dataclass

A utility class for handling JSON serialization and compression for ZMQ communication.

set_serializer(serializer) classmethod

Set Custom Serializer

recv(message) classmethod

Receives and deserializes a ZeroMQ message.

send(channel='', command='request', node='')

Serializes and compresses data to send as a ZeroMQ message.


ZeroMQ Backend — Examples


zmag.Device(mode='queue', backend=tcp_string(5556), frontend=tcp_string(5555), publickey=None, secretkey=None)

ZeroMQ Proxy Device

start()

Run Device


zmag.Backend(mode='queue', backend=tcp_string(5556), frontend=tcp_string(5555), name=None, attach=False, publickey=None, secretkey=None, serverkey=None)

ZeroMQ Backend Server

recv() async

Receive Request (JSON).

send(__head__=None, **kwargs) async

Send Response (JSON).

Parameters:

  • __head__ (dict | None, default: None ) –

    Request headers.

  • body (**kwargs) –

    Request body.

Example:

backend = zmag.Backend(...)

backend.send({"token":"secret"}, message = "hello world")

publish(channel, data) async

Send Pub (JSON).

Parameters:

  • data (Data) –

    description

Example:

backend = zmag.Backend(...)

data = zmag.Data(body={"message": "hello world"})

backend.publish("channel", data)

push(data) async

Send Push Work (JSON).

Parameters:

  • data (Data) –

    description

Example:

backend = zmag.Backend(...)

data = zmag.Data(body={"message": "hello world"})

backend.push(data)

zmag.pub(obj=None, *, channel=None, seconds=0, minutes=0, hours=0, days=0)

This decorator transforms an async function into a ZeroMQ publisher, enabling it to broadcast data to the frontend.

The function publishes either zmag.Data or None based on the availability of updates.

Returns:

  • Data | None

    Return zmag.Data if updates are available, None If no updates are available.

Example:

@zmag.pub # or zmag.pub(seconds=5)
async def topic(): # `topic` is the channel
    response = zmag.Data()
    response.body = {"message": "hello world"}
    return response

@zmag.pub(seconds=5)
async def generic(): # Custom `channel`
    response = zmag.Data()
    response.meta["channel"] = "custom"
    ...

@zmag.pub
async def graphql(context): # GraphQL
    gql_query = "query { books { id title } }"
    results = await context.schema.execute(gql_query)
    ...

zmag.push(obj=None, *, seconds=0, minutes=0, hours=0, days=0)

This decorator transforms an async function into a ZeroMQ pusher, allowing it to send data for the frontend to consume.

The function pushes either zmag.Data or None, based on whether there is new data available.

Returns:

  • Data | None

    Return zmag.Data If there is data to be sent, None If there is no data to send.

Example:

@zmag.push # or zmag.push(seconds=5)
async def push_method(): #
    response = zmag.Data()
    response.body = {"message": "hello world"}
    return response

@zmag.push
async def push_graphql(context): # GraphQL
    gql_query = "query { books { id title } }"
    results = await context.schema.execute(gql_query)
    ...

ZeroMQ Frontend — Examples


zmag.Frontend(mode='queue', host=tcp_string(5555), timeout=5000, is_sync=False, ssh=None, publickey=None, secretkey=None, serverkey=None)

ZeroMQ Frontend Client

Parameters:

  • host (str, default: tcp_string(5555) ) –

    description.

  • timeout (int, default: 5000 ) –

    description.

  • is_sync (bool, default: False ) –

    description.

  • ssh (ConfigSSH | None, default: None ) –

    description.

  • publickey (str | None, default: None ) –

    description.

  • secretkey (str | None, default: None ) –

    description.

  • serverkey (str | None, default: None ) –

    description.

request(query=None, operation=None, variables=None, context=None)

Sends a Request to the server.

Parameters:

  • query (str | None, default: None ) –

    description.

  • operation (str | None, default: None ) –

    description.

  • variables (dict | None, default: None ) –

    description.

  • context (dict | None, default: None ) –

    description.

Example:

client = zmag.Frontend(...)

response = await client.request(...)
print(response)

subscribe(channel='')

Subscribes to a ZeroMQ channel to receive messages and updates in real-time.

Parameters:

  • channel (str, default: '' ) –

    description.

Example:

client = zmag.Frontend(...)

while True:
    message = await client.subscribe("") # or "some_channel"
    print("Received:", message)

pull()

Consumes data from a ZeroMQ producer, processing incoming workloads.

Example:

client = zmag.Frontend(...)

while True:
    work = await client.pull()
    print("Received:", work)

zmag.ConfigSSH(host, keyfile=None, password=None, paramiko=None, timeout=60) dataclass

ZeroMQ SSH Configuration

  • https://pyzmq.readthedocs.io/en/latest/howto/ssh.html