StompWiseAgentTransport

Bases: WiseAgentTransport

A transport for sending messages between agents using the STOMP protocol.

Source code in wiseagents/transports/stomp.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class StompWiseAgentTransport(WiseAgentTransport):
    '''A transport for sending messages between agents using the STOMP protocol.'''

    yaml_tag = u'!wiseagents.transports.StompWiseAgentTransport'
    request_conn : stomp.Connection = None
    response_conn : stomp.Connection = None

    def __init__(self, host: str, port: int, agent_name: str):
        '''Initialize the transport.

        Args:
            host (str): the host
            port (int): the port
            agent_name (str): the agent name'''
        self._host = host
        self._port = port
        self._agent_name = agent_name


    def __repr__(self) -> str:
        return f"host={self._host}, port={self._port}, agent_name={self._agent_name}"

    def __getstate__(self) -> object:
        '''Return the state of the transport. Removing the instance variable chain to avoid it is serialized/deserialized by pyyaml.'''
        state = super().__getstate__()
        del state['request_conn']
        del state['response_conn']
        return state


    def start(self):
        '''
        Start the transport.
        require the environment variables STOMP_USER and STOMP_PASSWORD to be set'''
        if (self.request_conn is not None and self.request_conn.is_connected()) and (self.response_conn is not None and self.response_conn.is_connected()):
            return
        hosts = [(self.host, self.port)] 
        self.request_conn = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))
        self.request_conn.set_listener('WiseAgentRequestTopicListener', WiseAgentRequestQueueListener(self))
        self.request_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
        self.request_conn.subscribe(destination=self.request_queue, id=id(self), ack='auto')

        self.response_conn = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))

        self.response_conn.set_listener('WiseAgentResponseQueueListener', WiseAgentResponseQueueListener(self))
        self.response_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)

        self.response_conn.subscribe(destination=self.response_queue, id=id(self) + 1 , ack='auto')


    def send_request(self, message: WiseAgentMessage, dest_agent_name: str):
        '''Send a request message to an agent.

        Args:
            message (WiseAgentMessage): the message to send
            dest_agent_name (str): the destination agent name'''
        # Send the message using the STOMP protocol
        if self.request_conn is None or self.response_conn is None:
            self.start()
        if self.request_conn.is_connected() == False:
            self.request_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
        if self.response_conn.is_connected() == False:
            self.response_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
        request_destination = '/queue/request/' + dest_agent_name
        logging.getLogger(__name__).debug(f"Sending request {message} to {request_destination}")
        self.request_conn.send(body=yaml.dump(message), destination=request_destination)

    def send_response(self, message: WiseAgentMessage, dest_agent_name: str):
        '''Send a response message to an agent.

        Args:
            message (WiseAgentMessage): the message to send
            dest_agent_name (str): the destination agent name'''
        # Send the message using the STOMP protocol
        if self.request_conn is None or self.response_conn is None:
            self.start()
        response_destination = '/queue/response/' + dest_agent_name    
        self.response_conn.send(body=yaml.dump(message), destination=response_destination)

    def stop(self):
        '''Stop the transport.'''
        if self.request_conn is not None and self.request_conn.is_connected():
            #unsubscribe from the request topic
            self.request_conn.unsubscribe(destination=self.request_queue, id=id(self))
            # Disconnect request from the STOMP server
            self.request_conn.disconnect()
        if self.response_conn is not None and self.response_conn.is_connected():
            #unsubscribe from the response queue
            self.response_conn.unsubscribe(destination=self.response_queue, id=id(self) + 1)
            # Disconnect response from the STOMP server
            self.response_conn.disconnect()


    @property
    def host(self) -> str:
        '''Get the host.'''
        return self._host
    @property
    def port(self) -> int:
        '''Get the port.'''
        return self._port
    @property
    def agent_name(self) -> str:
        '''Get the agent name.'''
        return self._agent_name
    @property
    def request_queue(self) -> str:
        '''Get the request queue.'''
        return '/queue/request/' + self.agent_name
    @property
    def response_queue(self) -> str:
        '''Get the response queue.'''
        return '/queue/response/' + self.agent_name

agent_name: str property

Get the agent name.

host: str property

Get the host.

port: int property

Get the port.

request_queue: str property

Get the request queue.

response_queue: str property

Get the response queue.

__getstate__()

Return the state of the transport. Removing the instance variable chain to avoid it is serialized/deserialized by pyyaml.

Source code in wiseagents/transports/stomp.py
73
74
75
76
77
78
def __getstate__(self) -> object:
    '''Return the state of the transport. Removing the instance variable chain to avoid it is serialized/deserialized by pyyaml.'''
    state = super().__getstate__()
    del state['request_conn']
    del state['response_conn']
    return state

__init__(host, port, agent_name)

Initialize the transport.

Parameters:
  • host (str) –

    the host

  • port (int) –

    the port

  • agent_name (str) –

    the agent name

Source code in wiseagents/transports/stomp.py
58
59
60
61
62
63
64
65
66
67
def __init__(self, host: str, port: int, agent_name: str):
    '''Initialize the transport.

    Args:
        host (str): the host
        port (int): the port
        agent_name (str): the agent name'''
    self._host = host
    self._port = port
    self._agent_name = agent_name

send_request(message, dest_agent_name)

Send a request message to an agent.

Parameters:
  • message (WiseAgentMessage) –

    the message to send

  • dest_agent_name (str) –

    the destination agent name

Source code in wiseagents/transports/stomp.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def send_request(self, message: WiseAgentMessage, dest_agent_name: str):
    '''Send a request message to an agent.

    Args:
        message (WiseAgentMessage): the message to send
        dest_agent_name (str): the destination agent name'''
    # Send the message using the STOMP protocol
    if self.request_conn is None or self.response_conn is None:
        self.start()
    if self.request_conn.is_connected() == False:
        self.request_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
    if self.response_conn.is_connected() == False:
        self.response_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
    request_destination = '/queue/request/' + dest_agent_name
    logging.getLogger(__name__).debug(f"Sending request {message} to {request_destination}")
    self.request_conn.send(body=yaml.dump(message), destination=request_destination)

send_response(message, dest_agent_name)

Send a response message to an agent.

Parameters:
  • message (WiseAgentMessage) –

    the message to send

  • dest_agent_name (str) –

    the destination agent name

Source code in wiseagents/transports/stomp.py
118
119
120
121
122
123
124
125
126
127
128
def send_response(self, message: WiseAgentMessage, dest_agent_name: str):
    '''Send a response message to an agent.

    Args:
        message (WiseAgentMessage): the message to send
        dest_agent_name (str): the destination agent name'''
    # Send the message using the STOMP protocol
    if self.request_conn is None or self.response_conn is None:
        self.start()
    response_destination = '/queue/response/' + dest_agent_name    
    self.response_conn.send(body=yaml.dump(message), destination=response_destination)

start()

Start the transport. require the environment variables STOMP_USER and STOMP_PASSWORD to be set

Source code in wiseagents/transports/stomp.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def start(self):
    '''
    Start the transport.
    require the environment variables STOMP_USER and STOMP_PASSWORD to be set'''
    if (self.request_conn is not None and self.request_conn.is_connected()) and (self.response_conn is not None and self.response_conn.is_connected()):
        return
    hosts = [(self.host, self.port)] 
    self.request_conn = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))
    self.request_conn.set_listener('WiseAgentRequestTopicListener', WiseAgentRequestQueueListener(self))
    self.request_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)
    self.request_conn.subscribe(destination=self.request_queue, id=id(self), ack='auto')

    self.response_conn = stomp.Connection(host_and_ports=hosts, heartbeats=(60000, 60000))

    self.response_conn.set_listener('WiseAgentResponseQueueListener', WiseAgentResponseQueueListener(self))
    self.response_conn.connect(os.getenv("STOMP_USER"), os.getenv("STOMP_PASSWORD"), wait=True)

    self.response_conn.subscribe(destination=self.response_queue, id=id(self) + 1 , ack='auto')

stop()

Stop the transport.

Source code in wiseagents/transports/stomp.py
130
131
132
133
134
135
136
137
138
139
140
141
def stop(self):
    '''Stop the transport.'''
    if self.request_conn is not None and self.request_conn.is_connected():
        #unsubscribe from the request topic
        self.request_conn.unsubscribe(destination=self.request_queue, id=id(self))
        # Disconnect request from the STOMP server
        self.request_conn.disconnect()
    if self.response_conn is not None and self.response_conn.is_connected():
        #unsubscribe from the response queue
        self.response_conn.unsubscribe(destination=self.response_queue, id=id(self) + 1)
        # Disconnect response from the STOMP server
        self.response_conn.disconnect()