Magentix2
2.1.1
|
Classes | |
class | Listener |
class | TraceListener |
Public Member Functions | |
BaseAgent (AgentID aid) throws Exception | |
void | changeIdentity (AgentID _aid) throws Exception |
void | send (ACLMessage msg) |
void | sendTraceEvent (TraceEvent tEvent) throws TraceServiceNotAllowedException |
TraceMask | updateTraceMask () throws TraceSystemUnreachableException |
TraceMask | updateTraceMask (int timeOut) throws TraceSystemUnreachableException |
void | finalize () |
void | run () |
void | start () |
String | getName () |
AgentID | getAid () |
TraceMask | getTraceMask () |
AgentID | getTraceManager () |
Static Public Attributes | |
static final String | SYSTEM_NAME = "system" |
static final AgentID | SYSTEM_AID = new AgentID(SYSTEM_NAME) |
Protected Member Functions | |
void | init () |
void | execute () |
void | preOnMessage (ACLMessage msg) throws Exception |
void | onMessage (ACLMessage msg) throws Exception |
void | onTraceEvent (TraceEvent tEvent) |
void | terminate () |
boolean | existAgent (AgentID aid) |
Protected Attributes | |
Logger | logger = Logger.getLogger(BaseAgent.class) |
Session | session |
Session | traceSession |
TraceMask | traceMask |
Base class to inherit when implementing a basic Magentix agent. This class provides mechanisms to enable communication between agents. A good alternative is to inherit from SingleAgent, which provides methods (callbacks) when a message arrives, etc. If this class is too basic, consider to inherit from one of the two possibilities given by the QueueAgent and the CAgent.
Definition at line 61 of file BaseAgent.java.
es.upv.dsic.gti_ia.core.BaseAgent.BaseAgent | ( | AgentID | aid | ) | throws Exception |
Creates a new agent in an open broker connection
aid | Agent identification for the new agent, it has to be unique on the platform |
Exception | If Agent ID already exists on the platform |
Definition at line 239 of file BaseAgent.java.
{ // Si no estamos en modo seguro funcionara como siempre, es por tanto // transparente al programador. if (c.isSecureMode()) { propFile = new FileInputStream( "./configuration/securityUser.properties"); propSecurity = new Properties(); try { // Nuevo fichero para la configuración de datos para la // seguridad. propSecurity.load(propFile); } catch (FileNotFoundException e) { logger.error(e); } catch (IOException e) { logger.error(e); e.printStackTrace(); } st = SecurityTools.GetInstance(); // Vemos si el usuario ya posee algún certificado para ese agente. // Se comprueba también // la validez. // Este método es el encargado de crear todo el proceso de solicitud // y creación de // certificados para los // agentes del usuario. Podemos encontrarlo en la clase // SecurityTools del paquete // secure. if (st.generateAllProcessCertificate(aid.name, propSecurity)) { connection = null; // El alias sera el mismo que el nombre del agente String certAlias = aid.name; // deberemos crear una conexion por cada agente del usuario. connection = new Connection(); ConnectionSettings connectSettings = new ConnectionSettings(); connectSettings.setHost(c.getqpidHost()); connectSettings.setPort(c.getqpidPort()); connectSettings.setVhost(c.getqpidVhost()); connectSettings.setUsername(c.getqpidUser()); connectSettings.setPassword(c.getqpidPassword()); connectSettings.setUseSSL(c.getqpidSSL()); connectSettings.setSaslMechs(c.getqpidsaslMechs()); // Accedemos al fichero de configuración de seguridad del // usuario. connectSettings.setKeyStorePassword(propSecurity .getProperty("KeyStorePassword")); connectSettings.setKeyStorePath(propSecurity .getProperty("KeyStorePath")); // Lo convertimos a minusculas para que no haya problemas connectSettings.setCertAlias(certAlias.toLowerCase()); connectSettings.setTrustStorePassword(propSecurity .getProperty("TrustStorePassword")); connectSettings.setTrustStorePath(propSecurity .getProperty("TrustStorePath")); try { connection.connect(connectSettings); } catch (Exception e) { logger.error("Error in connect: " + e); } } propFile.close(); } else { if (AgentsConnection.connection == null) { logger.error("Before create a agent, the qpid broker connection is necessary"); throw new Exception("Error doesn't work the broken connection"); } else { this.connection = AgentsConnection.getConnection(); } } // Esta parte es la misma que cuando no es modo seguro. this.session = createSession(); // Create a session for trace event transmission this.traceSession = createTraceSession(); if (this.existAgent(aid)) { session.close(); traceSession.close(); throw new Exception("Agent ID " + aid.name + " already exists on the platform"); } else { this.aid = aid; this.listener = new Listener(); myThread = new Thread(this); createQueue(); createBind(); createSubscription(); // Install the listener for trace events this.traceListener = new TraceListener(); createEventQueue(); createTraceBind(); createTraceSubscription(); } // Initialize the map of update requests this.traceMaskUpdated = new ArrayList<String>(); // Initialize a trace mask with all the services not available. this.traceMask = new TraceMask(false); // Send trace event NEW_AGENT sendSystemTraceEvent(new TraceEvent( TracingService.DI_TracingServices[TracingService.NEW_AGENT] .getName(), new AgentID("system", aid.protocol, aid.host, aid.port), aid .toString())); }
void es.upv.dsic.gti_ia.core.BaseAgent.changeIdentity | ( | AgentID | _aid | ) | throws Exception |
Changes the name of the agent to acquire a new identity.
_aid | the new agent id |
Exception |
Definition at line 368 of file BaseAgent.java.
{ // Si el MMS nos da un certificado para el agente es que podemos crear // la conexión para ese agente. // o si ya tenemos un certificado válido para ese agente. if (st.generateAllProcessCertificate(_aid.name, propSecurity)) { ConnectionSettings connectSettings = new ConnectionSettings(); // Se abre por que se ha cerrado anteriormente. propFile = new FileInputStream( "./configuration/securityUser.properties"); propSecurity.load(propFile); /*************************************************************************** * Conexión como agente anterior **************************************************************************/ unbindExchange(); unbindTraceExchange(); session.queueDelete(aid.name); traceSession.queueDelete(aid.name + ".trace"); session.close(); traceSession.close(); /*************************************************************************** * Conexión como nuevo agente **************************************************************************/ aid = _aid; connection = null; String certAlias = aid.name; connectSettings.setHost(c.getqpidHost()); connectSettings.setPort(c.getqpidPort()); connectSettings.setVhost(c.getqpidVhost()); connectSettings.setUsername(c.getqpidUser()); connectSettings.setPassword(c.getqpidPassword()); connectSettings.setUseSSL(c.getqpidSSL()); connectSettings.setSaslMechs(c.getqpidsaslMechs()); connectSettings.setCertAlias(certAlias.toLowerCase()); connectSettings.setKeyStorePassword(propSecurity .getProperty("KeyStorePassword")); connectSettings.setKeyStorePath(propSecurity .getProperty("KeyStorePath")); connectSettings.setTrustStorePassword(propSecurity .getProperty("TrustStorePassword")); connectSettings.setTrustStorePath(propSecurity .getProperty("TrustStorePath")); connection = new Connection(); connection.connect(connectSettings); // Create new sessions. session = createSession(); traceSession = createTraceSession(); createQueue(); createBind(); createSubscription(); // Install the listener for trace events createEventQueue(); createTraceBind(); createTraceSubscription(); propFile.close(); logger.info("Identity " + this.aid + " changed successfully!!"); } else { logger.error("Agent ID " + _aid.name + " already exists on the platform"); } }
void es.upv.dsic.gti_ia.core.BaseAgent.execute | ( | ) | [protected] |
Method that defines all the logic and behavior of the agent. This method necessarily must be defined.
Reimplemented in es.upv.dsic.gti_ia.cAgents.CAgent, es.upv.dsic.gti_ia.core.BridgeAgentInOut, es.upv.dsic.gti_ia.trace.TraceManager, and es.upv.dsic.gti_ia.core.BridgeAgentOutIn.
Definition at line 1081 of file BaseAgent.java.
{ }
boolean es.upv.dsic.gti_ia.core.BaseAgent.existAgent | ( | AgentID | aid | ) | [protected] |
Returns true if an agent exists on the platform, false otherwise
aid | Agent ID to look for |
Definition at line 1307 of file BaseAgent.java.
Define activities such as finalizing resources, and every task necessary after execution of execute procedure. It is executed when the agent finalizes and may be overriden by the user.
Definition at line 1092 of file BaseAgent.java.
{ }
Returns a structure as the Agent Identificator formed by the name, protocol, host and port Agent.
Definition at line 1277 of file BaseAgent.java.
{
return this.aid;
}
String es.upv.dsic.gti_ia.core.BaseAgent.getName | ( | ) |
Returns the agent name
Definition at line 1266 of file BaseAgent.java.
{ return aid.name; }
This method should find and return the system TraceManager. (For now just returns the default TraceManager. It will be expandable in the future.)
Definition at line 1296 of file BaseAgent.java.
{
return TraceInteract.DEFAULT_TM_AID;
}
Returns the trace mask that is being used by this agent.
Reimplemented in es.upv.dsic.gti_ia.trace.TraceManager.
Definition at line 1286 of file BaseAgent.java.
void es.upv.dsic.gti_ia.core.BaseAgent.init | ( | ) | [protected] |
Define activities such as initialization resources, and every task necessary before execution of execute procedure. It will be executed when the agent will be launched and may be defined by the user.
Definition at line 1073 of file BaseAgent.java.
{ }
void es.upv.dsic.gti_ia.core.BaseAgent.onMessage | ( | ACLMessage | msg | ) | throws Exception [protected] |
Function that will be executed when the agent gets a message The user has to write his/her code here
msg | Message received |
Reimplemented in es.upv.dsic.gti_ia.core.BridgeAgentInOut, es.upv.dsic.gti_ia.trace.TraceManager, es.upv.dsic.gti_ia.cAgents.CAgent, es.upv.dsic.gti_ia.jason.JasonAgent, es.upv.dsic.gti_ia.jason.conversationsFactory.ConvJasonAgent, es.upv.dsic.gti_ia.core.SingleAgent, and es.upv.dsic.gti_ia.architecture.QueueAgent.
Definition at line 1133 of file BaseAgent.java.
{ }
void es.upv.dsic.gti_ia.core.BaseAgent.onTraceEvent | ( | TraceEvent | tEvent | ) | [protected] |
Function that will be executed after a previous preprocessing of a received trace event. The user might override this method if he wants to apply a custom postprocessing.
Reimplemented in es.upv.dsic.gti_ia.trace.TraceManager.
Definition at line 1194 of file BaseAgent.java.
{ }
void es.upv.dsic.gti_ia.core.BaseAgent.preOnMessage | ( | ACLMessage | msg | ) | throws Exception [protected] |
UNTIL NOT COMMENT
msg |
Exception |
Definition at line 1102 of file BaseAgent.java.
{ boolean preprocessed = false; if (msg.getContent() != null) { String[] content = msg.getContent().split("#"); if (msg.getPerformativeInt() == ACLMessage.AGREE && content[0].equals("UpdateMask")) { this.traceMask = new TraceMask(content[1]); this.traceMaskUpdated.remove(msg.getConversationId()); preprocessed = true; } } /* * If the ACLMessage has not been preprocessed, the default method, * which is supposed to be overriden by the user, is called. */ if (!preprocessed) { onMessage(msg); } }
Runs Agent's thread
Definition at line 1227 of file BaseAgent.java.
{ try { init(); execute(); finalize(); terminate(); } catch (Exception e) { try { finalize(); } catch (Exception ex) { terminate(); logger.error(this.aid.getLocalName() + " ended execution incorrectly: " + ex); ex.printStackTrace(); } terminate(); logger.error(this.aid.getLocalName() + " ended execution incorrectly: " + e); e.printStackTrace(); } }
void es.upv.dsic.gti_ia.core.BaseAgent.send | ( | ACLMessage | msg | ) |
Sends a ACLMessage to all specified recipients agents. If a message destination having another platform, this will be forwarded to BridgeAgentInOut agent.
msg |
Permite incluir un arroba en el nombre del agente destinatario. Condici�n Obligatoria para JADE. @ ser� reemplazado por ~
Permite incluir un arroba en el nombre del agente destinatario. Condición Obligatoria para JADE. @ será reemplazado por ~
Reimplemented in es.upv.dsic.gti_ia.cAgents.CAgent.
Definition at line 530 of file BaseAgent.java.
{ if (msg.getExchangeHeaders().isEmpty()) { msg.getReceiver().name = msg.getReceiver().name.replace('@', '~'); MessageTransfer xfr = new MessageTransfer(); xfr.destination("amq.direct"); xfr.acceptMode(MessageAcceptMode.EXPLICIT); xfr.acquireMode(MessageAcquireMode.PRE_ACQUIRED); DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProperties = new MessageProperties(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(bos); oos.writeObject(msg); oos.flush(); } catch (IOException e) { e.printStackTrace(); } xfr.setBody(bos.toByteArray()); // Esto forma parte de la implementación para el soporte del no // repudio por parte de los // agentes. // Obligamos a que en el mensaje se envie la identidad verdadera del // agente emisor. if (c.isSecureMode()) { try { messageProperties.setUserId(msg.getSender().name.toString() .getBytes("UTF-8")); } catch (java.io.UnsupportedEncodingException e) { logger.error("Caught exception " + e.toString()); } } for (int i = 0; i < msg.getTotalReceivers(); i++) { // If protocol is not qpid then the message goes outside the // platform if (!msg.getReceiver(i).protocol.equals("qpid")) { deliveryProps.setRoutingKey("BridgeAgentInOut"); } else { deliveryProps.setRoutingKey(msg.getReceiver(i).name); } session.messageTransfer(xfr.getDestination(), xfr .getAcceptMode(), xfr.getAcquireMode(), new Header( deliveryProps, messageProperties), xfr.getBodyBytes()); } } else { MessageTransfer xfr = new MessageTransfer(); xfr.destination("amq.match"); xfr.acceptMode(MessageAcceptMode.EXPLICIT); xfr.acquireMode(MessageAcquireMode.PRE_ACQUIRED); DeliveryProperties deliveryProps = new DeliveryProperties(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(bos); oos.writeObject(msg); oos.flush(); } catch (IOException e) { e.printStackTrace(); } xfr.setBody(bos.toByteArray()); // set message headers MessageProperties messageProperties = new MessageProperties(); Map<String, Object> messageHeaders = new HashMap<String, Object>(); for (String key : msg.getExchangeHeaders().keySet()) { messageHeaders.put(key, msg.getExchangeHeader(key)); } messageProperties.setApplicationHeaders(messageHeaders); Header header = new Header(deliveryProps, messageProperties); this.session .messageTransfer("amq.match", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, header, xfr.getBodyBytes()); } /* * Send trace event with the message if the corresponding service is * active in the mask (checked in sendTraceEvent(...) method). */ TracingService tsMsgSent = TracingService.DI_TracingServices[TracingService.MESSAGE_SENT]; try { sendTraceEvent(new TraceEvent(tsMsgSent.getName(), aid, msg .getReceiver().toString())); } catch (TraceServiceNotAllowedException e) { } /* * Send trace event with the detailed message if the corresponding * service is active in the mask (checked in sendTraceEvent(...) * method). */ TracingService tsMsgSentDtl = TracingService.DI_TracingServices[TracingService.MESSAGE_SENT_DETAIL]; try { sendTraceEvent(new TraceEvent(tsMsgSentDtl.getName(), aid, msg.toString())); } catch (TraceServiceNotAllowedException e) { } /* * msg.getReceiver().name = msg.getReceiver().name.replace('@', '~'); * MessageTransfer xfr = new MessageTransfer(); * xfr.destination("amq.direct"); * xfr.acceptMode(MessageAcceptMode.EXPLICIT); * xfr.acquireMode(MessageAcquireMode.PRE_ACQUIRED); DeliveryProperties * deliveryProps = new DeliveryProperties(); MessageProperties * messageProperties = new MessageProperties(); // Serialize message * content String body; // Performative body = msg.getPerformativeInt() * + "#"; // Sender body = body + msg.getSender().toString().length() + * "#" + msg.getSender().toString(); // receiver body = body + * msg.getReceiver().toString().length() + "#" + * msg.getReceiver().toString(); // reply to body = body + * msg.getReplyTo().toString().length() + "#" + * msg.getReplyTo().toString(); // language body = body + * msg.getLanguage().length() + "#" + msg.getLanguage(); // encoding * body = body + msg.getEncoding().length() + "#" + msg.getEncoding(); * // ontology body = body + msg.getOntology().length() + "#" + * msg.getOntology(); // protocol body = body + * msg.getProtocol().length() + "#" + msg.getProtocol(); // conversation * id body = body + msg.getConversationId().length() + "#" + * msg.getConversationId(); // reply with body = body + * msg.getReplyWith().length() + "#" + msg.getReplyWith(); // in reply * to body = body + msg.getInReplyTo().length() + "#" + * msg.getInReplyTo(); // reply by body = body + * msg.getReplyBy().length() + "#" + msg.getReplyBy(); // content body = * body + msg.getContent().length() + "#" + msg.getContent(); // * serialize message headers, it looks like: number of // * headers#key.length#key|value.length#value // number of headers body = * body + String.valueOf(msg.getHeaders().size()) + "#"; Map<String, * String> headers = new HashMap<String, String>(msg.getHeaders()); * Iterator<String> itr = headers.keySet().iterator(); String key; // * iterate through HashMap values iterator while (itr.hasNext()) { key = * itr.next(); body = body + key.length() + "#" + key; body = body + * headers.get(key).length() + "#" + headers.get(key); } * xfr.setBody(body); // Esto forma parte de la implementación para el * soporte del no repudio por parte de los // agentes. // Obligamos a * que en el mensaje se envie la identidad verdadera del agente emisor. * if (c.isSecureMode()) { try { * messageProperties.setUserId(msg.getSender().name.toString() * .getBytes("UTF-8")); } catch (java.io.UnsupportedEncodingException e) * { logger.error("Caught exception " + e.toString()); } } for (int i = * 0; i < msg.getTotalReceivers(); i++) { // If protocol is not qpid * then the message goes outside the // platform if * (!msg.getReceiver(i).protocol.equals("qpid")) { * deliveryProps.setRoutingKey("BridgeAgentInOut"); } else { * deliveryProps.setRoutingKey(msg.getReceiver(i).name); } * xfr.header(new Header(deliveryProps)); try { // Si el broker destruye * la session por una accion no permitida // realizada anteriormente. /* * if (session.getCommandsIn() == this.sessionCommandsIn + 1) { * this.reloadSession(); } */ /* * session.messageTransfer(xfr.getDestination(), xfr .getAcceptMode(), * xfr.getAcquireMode(), new Header( deliveryProps, messageProperties), * xfr.getBodyString()); } catch (SessionException e) { * this.reloadSession(); logger.error(e.getMessage()); } catch * (Exception e) { logger.error("Caught exception " + e.toString()); } } */ }
void es.upv.dsic.gti_ia.core.BaseAgent.sendTraceEvent | ( | TraceEvent | tEvent | ) | throws TraceServiceNotAllowedException |
Sends a trace event to the mgx.trace exchange
tEvent | Trace event which is to be sent |
TraceServiceNotAllowedException | if the custom tracing services are not allowed |
Definition at line 809 of file BaseAgent.java.
{ Integer bitIndex = -1; /* * Check if the trace event corresponds to a domain independent (DI) * tracing service. */ TracingService ts = TracingService.getDITracingServiceByName(tEvent .getTracingService()); /* * If it is null, the trace event is not related with a DI tracing * service, so the CUSTOM bit must be checked in the mask. * * Else, the trace event corresponds to a DI tracing service, so its * corresponding bit in the mask must be checked. */ if (ts == null) { bitIndex = TraceMask.CUSTOM; } else { bitIndex = ts.getMaskBitIndex(); } /* * If the bit is null (because the tracing service has not associated * bit in the mask) or it is active, proceed with the send. * * Else, throw an exception. */ if (bitIndex == null || this.traceMask.get(bitIndex) == true) { sendFilteredTraceEvent(tEvent); } else { throw new TraceServiceNotAllowedException(); } }
void es.upv.dsic.gti_ia.core.BaseAgent.terminate | ( | ) | [protected] |
Function that will be executed when the agent terminates.
The trace system is notified when the agent is about to disappear. Also the connection with Qpid is closed.
Reimplemented in es.upv.dsic.gti_ia.architecture.QueueAgent.
Definition at line 1203 of file BaseAgent.java.
{ sendSystemTraceEvent(new TraceEvent( TracingService.DI_TracingServices[TracingService.AGENT_DESTROYED] .getName(), new AgentID("system", aid.protocol, aid.host, aid.port), aid.toString())); try { this.unbindExchange(); this.unbindTraceExchange(); session.queueDelete(aid.name); session.close(); traceSession.queueDelete(aid.name + ".trace"); traceSession.close(); } catch (Exception e) { logger.error(this.aid.getLocalName() + " ended execution incorrectly: " + e); } }
TraceMask es.upv.dsic.gti_ia.core.BaseAgent.updateTraceMask | ( | ) | throws TraceSystemUnreachableException |
Request an update trace mask to Trace Manager. Blocking action with timeout.
Definition at line 996 of file BaseAgent.java.
{ // Building an ACL message. ACLMessage msg = new ACLMessage(ACLMessage.REQUEST); msg.setSender(this.getAid()); msg.setReceiver(this.getTraceManager()); msg.setContent("UpdateMask#any"); send(msg); int accumulatedTime = 0; int defaultTimeOut = 2000; String messageId = msg.getConversationId(); traceMaskUpdated.add(messageId); while (traceMaskUpdated.contains(messageId) && accumulatedTime < defaultTimeOut) { try { Thread.sleep(10); } catch (Exception e) { e.getMessage(); } accumulatedTime += 10; } if (traceMaskUpdated.contains(messageId)) throw new TraceSystemUnreachableException(); return traceMask.clone(); }
TraceMask es.upv.dsic.gti_ia.core.BaseAgent.updateTraceMask | ( | int | timeOut | ) | throws TraceSystemUnreachableException |
Request an update trace mask to Trace Manager. Blocking action with timeout.
timeOut | Time in milliseconds that the agent will be blocked. |
Definition at line 1036 of file BaseAgent.java.
{ // Building an ACL message. ACLMessage msg = new ACLMessage(ACLMessage.REQUEST); msg.setSender(this.getAid()); msg.setReceiver(this.getTraceManager()); msg.setContent("UpdateMask#any"); send(msg); int accumulatedTime = 0; String messageId = msg.getConversationId(); traceMaskUpdated.add(messageId); while (traceMaskUpdated.contains(messageId) && accumulatedTime < timeOut) { try { Thread.sleep(10); } catch (Exception e) { e.getMessage(); } accumulatedTime += 10; } if (traceMaskUpdated.contains(messageId)) throw new TraceSystemUnreachableException(); return traceMask.clone(); }
Logger es.upv.dsic.gti_ia.core.BaseAgent.logger = Logger.getLogger(BaseAgent.class) [protected] |
The logger variable considers to print any event that occurs by the agent
Reimplemented in es.upv.dsic.gti_ia.argAgents.ArgCAgent, es.upv.dsic.gti_ia.organization.OMS, and es.upv.dsic.gti_ia.organization.SF.
Definition at line 76 of file BaseAgent.java.
Session es.upv.dsic.gti_ia.core.BaseAgent.session [protected] |
QPid session used for communication.
Definition at line 97 of file BaseAgent.java.
final AgentID es.upv.dsic.gti_ia.core.BaseAgent.SYSTEM_AID = new AgentID(SYSTEM_NAME) [static] |
Constant with the AID of the system.
Definition at line 71 of file BaseAgent.java.
final String es.upv.dsic.gti_ia.core.BaseAgent.SYSTEM_NAME = "system" [static] |
Constant with the name of the system.
Definition at line 66 of file BaseAgent.java.
TraceMask es.upv.dsic.gti_ia.core.BaseAgent.traceMask [protected] |
Bit mask used to manage the trace interactions.
Definition at line 118 of file BaseAgent.java.
Session es.upv.dsic.gti_ia.core.BaseAgent.traceSession [protected] |
Qpid session used for tracing.
Definition at line 103 of file BaseAgent.java.