StompWiseAgentTransport

Bases: WiseAgentTransport

A transport for sending messages between agents using the STOMP protocol.

Source code in wiseagents/transports/stomp.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class StompWiseAgentTransport(WiseAgentTransport):
    '''A transport for sending messages between agents using the STOMP protocol.'''

    yaml_tag = u'!wiseagents.transport.StompWiseAgentTransport'
    conn : stomp.Connection = None
    conn2 : stomp.Connection = None
    def __init__(self, host: str, port: int, agent_name: str):
        '''Initialize the transport.

        Args:
            host (str): the host
            port (int): the port
            agent_name (str): the agent name'''
        self._host = host
        self._port = port
        self._agent_name = agent_name


    def __repr__(self) -> str:
        return super().__repr__() + f"host={self._host}, port={self._port}, agent_name={self._agent_name}"

    def __getstate__(self) -> object:
        '''Return the state of the transport. Removing the instance variable chain to avoid it is serialized/deserialized by pyyaml.'''
        state = self.__dict__.copy()
        del state['_request_receiver']
        del state['_response_receiver']
        del state['_event_receiver']
        del state['_error_receiver']
        return state 


    def start(self):
        '''
        Start the transport.
        require the environment variables STOMP_USER and STOMP_PASSWORD to be set'''
        hosts = [(self.host, self.port)] 
        self.conn = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))
        self.conn.set_listener('WiseAgentRequestTopicListener', WiseAgentRequestQueueListener(self))
        self.conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
        self.conn.subscribe(destination=self.request_queue, id=id(self), ack='auto')

        self.conn2 = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))

        self.conn2.set_listener('WiseAgentResponseQueueListener', WiseAgentResponseQueueListener(self))
        self.conn2.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)

        self.conn2.subscribe(destination=self.response_queue, id=id(self) + 1 , ack='auto')


    def send_request(self, message: WiseAgentMessage, dest_agent_name: str):
        '''Send a request message to an agent.

        Args:
            message (WiseAgentMessage): the message to send
            dest_agent_name (str): the destination agent name'''
        # Send the message using the STOMP protocol
        if self.conn is None or self.conn2 is None:
            self.start()
        if self.conn.is_connected() == False:
            self.conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
        if self.conn2.is_connected() == False:
            self.conn2.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
        request_destination = '/queue/request/' + dest_agent_name
        logging.debug(f"Sending request {message} to {request_destination}")    
        self.conn.send(body=yaml.dump(message), destination=request_destination)

    def send_response(self, message: WiseAgentMessage, dest_agent_name: str):
        '''Send a response message to an agent.

        Args:
            message (WiseAgentMessage): the message to send
            dest_agent_name (str): the destination agent name'''
        # Send the message using the STOMP protocol
        if self.conn is None or self.conn2 is None:
            self.start()
        response_destination = '/queue/response/' + dest_agent_name    
        self.conn2.send(body=yaml.dump(message), destination=response_destination)

    def stop(self):
        '''Stop the transport.'''
        if self.conn is not None:
            #unsubscribe from the request topic
            self.conn.unsubscribe(destination=self.request_queue, id=id(self))
            #unsubscribe from the response queue
            self.conn2.unsubscribe(destination=self.response_queue, id=id(self) + 1)
            # Disconnect from the STOMP server
            self.conn.disconnect()
            self.conn2.disconnect()


    @property
    def host(self) -> str:
        '''Get the host.'''
        return self._host
    @property
    def port(self) -> int:
        '''Get the port.'''
        return self._port
    @property
    def agent_name(self) -> str:
        '''Get the agent name.'''
        return self._agent_name
    @property
    def request_queue(self) -> str:
        '''Get the request queue.'''
        return '/queue/request/' + self.agent_name
    @property
    def response_queue(self) -> str:
        '''Get the response queue.'''
        return '/queue/response/' + self.agent_name

agent_name: str property

Get the agent name.

host: str property

Get the host.

port: int property

Get the port.

request_queue: str property

Get the request queue.

response_queue: str property

Get the response queue.

__getstate__()

Return the state of the transport. Removing the instance variable chain to avoid it is serialized/deserialized by pyyaml.

Source code in wiseagents/transports/stomp.py
76
77
78
79
80
81
82
83
def __getstate__(self) -> object:
    '''Return the state of the transport. Removing the instance variable chain to avoid it is serialized/deserialized by pyyaml.'''
    state = self.__dict__.copy()
    del state['_request_receiver']
    del state['_response_receiver']
    del state['_event_receiver']
    del state['_error_receiver']
    return state 

__init__(host, port, agent_name)

Initialize the transport.

Parameters:
  • host (str) –

    the host

  • port (int) –

    the port

  • agent_name (str) –

    the agent name

Source code in wiseagents/transports/stomp.py
61
62
63
64
65
66
67
68
69
70
def __init__(self, host: str, port: int, agent_name: str):
    '''Initialize the transport.

    Args:
        host (str): the host
        port (int): the port
        agent_name (str): the agent name'''
    self._host = host
    self._port = port
    self._agent_name = agent_name

send_request(message, dest_agent_name)

Send a request message to an agent.

Parameters:
  • message (WiseAgentMessage) –

    the message to send

  • dest_agent_name (str) –

    the destination agent name

Source code in wiseagents/transports/stomp.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def send_request(self, message: WiseAgentMessage, dest_agent_name: str):
    '''Send a request message to an agent.

    Args:
        message (WiseAgentMessage): the message to send
        dest_agent_name (str): the destination agent name'''
    # Send the message using the STOMP protocol
    if self.conn is None or self.conn2 is None:
        self.start()
    if self.conn.is_connected() == False:
        self.conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
    if self.conn2.is_connected() == False:
        self.conn2.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
    request_destination = '/queue/request/' + dest_agent_name
    logging.debug(f"Sending request {message} to {request_destination}")    
    self.conn.send(body=yaml.dump(message), destination=request_destination)

send_response(message, dest_agent_name)

Send a response message to an agent.

Parameters:
  • message (WiseAgentMessage) –

    the message to send

  • dest_agent_name (str) –

    the destination agent name

Source code in wiseagents/transports/stomp.py
121
122
123
124
125
126
127
128
129
130
131
def send_response(self, message: WiseAgentMessage, dest_agent_name: str):
    '''Send a response message to an agent.

    Args:
        message (WiseAgentMessage): the message to send
        dest_agent_name (str): the destination agent name'''
    # Send the message using the STOMP protocol
    if self.conn is None or self.conn2 is None:
        self.start()
    response_destination = '/queue/response/' + dest_agent_name    
    self.conn2.send(body=yaml.dump(message), destination=response_destination)

start()

Start the transport. require the environment variables STOMP_USER and STOMP_PASSWORD to be set

Source code in wiseagents/transports/stomp.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def start(self):
    '''
    Start the transport.
    require the environment variables STOMP_USER and STOMP_PASSWORD to be set'''
    hosts = [(self.host, self.port)] 
    self.conn = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))
    self.conn.set_listener('WiseAgentRequestTopicListener', WiseAgentRequestQueueListener(self))
    self.conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
    self.conn.subscribe(destination=self.request_queue, id=id(self), ack='auto')

    self.conn2 = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))

    self.conn2.set_listener('WiseAgentResponseQueueListener', WiseAgentResponseQueueListener(self))
    self.conn2.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)

    self.conn2.subscribe(destination=self.response_queue, id=id(self) + 1 , ack='auto')

stop()

Stop the transport.

Source code in wiseagents/transports/stomp.py
133
134
135
136
137
138
139
140
141
142
def stop(self):
    '''Stop the transport.'''
    if self.conn is not None:
        #unsubscribe from the request topic
        self.conn.unsubscribe(destination=self.request_queue, id=id(self))
        #unsubscribe from the response queue
        self.conn2.unsubscribe(destination=self.response_queue, id=id(self) + 1)
        # Disconnect from the STOMP server
        self.conn.disconnect()
        self.conn2.disconnect()

WiseAgentRequestQueueListener

Bases: ConnectionListener

A listener for the request queue.

Source code in wiseagents/transports/stomp.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class WiseAgentRequestQueueListener(stomp.ConnectionListener):
    '''A listener for the request queue.'''

    def __init__(self, transport: WiseAgentTransport):
        '''Initialize the listener.

        Args:
            '''
        self.transport = transport

    def on_event(self, event):
        '''Handle an event.'''
        self.transport.event_receiver(event)

    def on_error(self, error):
        '''Handle an error.'''
        self.transport.error_receiver(error)

    def on_message(self, message: stomp.utils.Frame):
        '''Handle a message.'''
        logging.debug(f"{self}: Received message: {message}")
        logging.debug(f"Received message type: {message.__class__}")
        logging.debug(f"Calling the callback function: {self.transport.request_receiver}")
        self.transport.request_receiver(yaml.load(message.body, yaml.Loader))

__init__(transport)

Initialize the listener.

Args:

Source code in wiseagents/transports/stomp.py
12
13
14
15
16
17
def __init__(self, transport: WiseAgentTransport):
    '''Initialize the listener.

    Args:
        '''
    self.transport = transport

on_error(error)

Handle an error.

Source code in wiseagents/transports/stomp.py
23
24
25
def on_error(self, error):
    '''Handle an error.'''
    self.transport.error_receiver(error)

on_event(event)

Handle an event.

Source code in wiseagents/transports/stomp.py
19
20
21
def on_event(self, event):
    '''Handle an event.'''
    self.transport.event_receiver(event)

on_message(message)

Handle a message.

Source code in wiseagents/transports/stomp.py
27
28
29
30
31
32
def on_message(self, message: stomp.utils.Frame):
    '''Handle a message.'''
    logging.debug(f"{self}: Received message: {message}")
    logging.debug(f"Received message type: {message.__class__}")
    logging.debug(f"Calling the callback function: {self.transport.request_receiver}")
    self.transport.request_receiver(yaml.load(message.body, yaml.Loader))

WiseAgentResponseQueueListener

Bases: ConnectionListener

A listener for the response queue.

Source code in wiseagents/transports/stomp.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class WiseAgentResponseQueueListener(stomp.ConnectionListener):
    '''A listener for the response queue.'''
    def __init__(self, transport: WiseAgentTransport):
        '''Initialize the listener.

        Args:
            transport (WiseAgentTransport): the transport'''
        self.transport = transport

    def on_error(self, error):
        '''Handle an error.'''
        self.transport.error_receiver(error)

    def on_message(self, message: stomp.utils.Frame):
        '''Handle a message.'''
        logging.debug(f"Received message: {message}")
        logging.debug(f"Received message type: {message.__class__}")

        self.transport.response_receiver(yaml.load(message.body, yaml.Loader))

__init__(transport)

Initialize the listener.

Parameters:
Source code in wiseagents/transports/stomp.py
36
37
38
39
40
41
def __init__(self, transport: WiseAgentTransport):
    '''Initialize the listener.

    Args:
        transport (WiseAgentTransport): the transport'''
    self.transport = transport

on_error(error)

Handle an error.

Source code in wiseagents/transports/stomp.py
43
44
45
def on_error(self, error):
    '''Handle an error.'''
    self.transport.error_receiver(error)

on_message(message)

Handle a message.

Source code in wiseagents/transports/stomp.py
47
48
49
50
51
52
def on_message(self, message: stomp.utils.Frame):
    '''Handle a message.'''
    logging.debug(f"Received message: {message}")
    logging.debug(f"Received message type: {message.__class__}")

    self.transport.response_receiver(yaml.load(message.body, yaml.Loader))