Magentix2
2.1.1
|
Public Member Functions | |
TraceManager (AgentID aid) throws Exception | |
TraceManager (AgentID aid, boolean monitorizable) throws Exception | |
TraceMask | getTraceMask () |
void | setTraceMask (TraceMask traceMask) |
void | shutdown () |
void | onTraceEvent (TraceEvent tEvent) |
Static Public Attributes | |
static final String | DEFAULT_TM_NAME = "qpid://TM@localhost:8080" |
static final AgentID | DEFAULT_TM_AID = new AgentID(DEFAULT_TM_NAME) |
Protected Member Functions | |
void | execute () |
void | onMessage (ACLMessage msg) |
Trace Manager entity definition.
The trace manager entity is an agent in charge of coordinating and managing the event trace process. Tracing entities have to interact with the trace manager through ACL messages in order to publish/unpublish their tracing service and in order to subscribe/unsubscribe from tracing services.
Definition at line 37 of file TraceManager.java.
es.upv.dsic.gti_ia.trace.TraceManager.TraceManager | ( | AgentID | aid | ) | throws Exception |
Constructor which creates and initializes a TraceManager with the monitorization flag set to 'false'.
Initialization tasks are internally performed by invoking the private method es.upv.dsic.gti_ia.trace.TraceManager#initialize(). These tasks are the following:
1) Creation of empty Tracing Entities, Tracing Service Providers, Tracing Service Subscribers and Tracing Services lists.
2) Add the trace manager to the Tracing Entities List.
3) Initialize the Tracing Services list with DI tracing services and add the trace manager as provider of those tracing services which are mandatory and requestable.
4) Subscribe to NEW_AGENT and AGENT_DESTROYED tracing services in order to be able to register tracing entities in the system
5) Send a system trace event of type WELCOME_TM to inform already existing agents of the arrival of the trace manager.
aid | AgentID which will be used to create the agent |
Exception |
Definition at line 137 of file TraceManager.java.
{ this(aid, false); }
es.upv.dsic.gti_ia.trace.TraceManager.TraceManager | ( | AgentID | aid, |
boolean | monitorizable | ||
) | throws Exception |
Constructor which creates and initializes a TraceManager.
Initialization tasks are internally performed by invoking the private method es.upv.dsic.gti_ia.trace.TraceManager#initialize(). These tasks are the following:
1) Creation of empty Tracing Entities, Tracing Service Providers, Tracing Service Subscribers and Tracing Services lists.
2) Add the trace manager to the Tracing Entities List.
3) Initialize the Tracing Services list with DI tracing services and add the trace manager as provider of those tracing services which are mandatory and requestable.
4) Subscribe to NEW_AGENT and AGENT_DESTROYED tracing services in order to be able to register tracing entities in the system
5) Send a system trace event of type WELCOME_TM to inform already existing agents of the arrival of the trace manager.
aid | AgentID which will be used to create the agent |
monitorizable | Value to which the monitorizable attribute of the class will be set |
Exception |
Definition at line 175 of file TraceManager.java.
{ super(aid); this.monitorizable = monitorizable; this.finishExecution = new Semaphore(0); if (monitorizable) { logger.info("[TRACE MANAGER]: " + this.getAid().toString() + " launched with monitorization..."); } else { logger.info("[TRACE MANAGER]: " + this.getAid().toString() + " launched..."); } logger.setLevel(Level.OFF); /* Obtain the trace mask from the configuration class */ this.traceMask = new TraceMask(conf.getTraceMask()); initialize(); }
void es.upv.dsic.gti_ia.trace.TraceManager.execute | ( | ) | [protected] |
Inherited from the class BaseAgent.
Reimplemented from es.upv.dsic.gti_ia.core.BaseAgent.
Definition at line 417 of file TraceManager.java.
{ try { this.finishExecution.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // Preparing a trace event to update the mask of all the agents. AgentID tmAID = this.getAid(); AgentID systemAID = new AgentID(SYSTEM_NAME, tmAID.protocol, tmAID.host, tmAID.port); TracingService newMaskTS = TracingService.DI_TracingServices[TracingService.NEW_MASK]; /* * The new trace mask will have all services unavailable and the DIE * bit active, since the trace manager has gone. */ TraceMask noTMMask = new TraceMask(false); noTMMask.set(TraceMask.DIE); TraceEvent noTMEvent = new TraceEvent(newMaskTS.getName(), systemAID, noTMMask.toString()); sendSystemTraceEvent(noTMEvent, null); } }
Returns the current trace mask of the system.
Reimplemented from es.upv.dsic.gti_ia.core.BaseAgent.
Definition at line 203 of file TraceManager.java.
void es.upv.dsic.gti_ia.trace.TraceManager.onMessage | ( | ACLMessage | msg | ) | [protected] |
Requests to the trace manager are sent via ACL messages which are processed in this method
msg | Message received |
Building a ACLMessage
Building a ACLMessage
Sending a ACLMessage
Building a ACLMessage
Sending a ACLMessage
Building a ACLMessage
Reimplemented from es.upv.dsic.gti_ia.core.BaseAgent.
Definition at line 456 of file TraceManager.java.
{ String content, auxContent, serviceName, serviceDescription, originEntity; Map<String, Object> arguments; int index, index2, length, counter; TraceEvent tEvent; // = new TraceEvent(); ACLMessage response_msg = null; String command, specification; TracingService tService = null; TracingEntity tEntity = null, originTEntity = null; TracingServiceSubscription tServiceSubscription = null; Iterator<TracingServiceSubscription> TSS_iter; Iterator<TracingService> TS_iter; Iterator<TracingEntity> TE_iter; AgentID originAid;// , requestedAid; int aidindice1 = 0; int aidindice2 = 0; String tEventContent; boolean agree_response = true; boolean added_TS = false; boolean added_TSP = false; boolean linked_TE_TS = false; boolean added_TSS = false; boolean error; logger.info("[TRACE MANAGER]: Received [" + msg.getPerformativeInt() + "] -> " + msg.getContent()); switch (msg.getPerformativeInt()) { case ACLMessage.REQUEST: content = msg.getContent(); index = content.indexOf('#', 0); command = content.substring(0, index); if (command.equals("publish")) { // Publication of a tracing service index2 = content.indexOf('#', index + 1); length = Integer.parseInt(content.substring(index + 1, index2)); serviceName = content.substring(index2 + 1, index2 + 1 + length); index = index2 + length + 1; serviceDescription = content.substring(index); // Check the CUSTOM bit of the trace mask. if (this.traceMask.get(TraceMask.CUSTOM) == false) { // The mask does not allow to publish tracing services. agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName.length() + "#" + serviceName + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if ((tEntity = TracingEntities.getTEByAid(msg .getSender())) == null) { // Error getting the tracing entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName.length() + "#" + serviceName + TraceError.ENTITY_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if (!TSProviderEntities.contains(tEntity)) { // Register tracing entity as service provider synchronized (TSProviderEntities) { error = TSProviderEntities.add(tEntity); } if (error) { added_TSP = true; } else { // Error adding the tracing entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName.length() + "#" + serviceName + TraceError.BAD_ENTITY); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } } // Add tracing service if (agree_response && ((tService = TracingServices.getTS(serviceName)) == null)) { // The tracing service does not exist tService = new TracingService(serviceName, serviceDescription); synchronized (TracingServices) { error = TracingServices.add(tService); } if (error) { added_TS = true; } else { // Impossible to add tracing service agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName.length() + "#" + serviceName + TraceError.BAD_SERVICE); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } } // Link service provider and tracing service if (agree_response) { if (tService.getProviders().contains(tEntity) || tEntity.getPublishedTS().contains(tService)) { // Tracing service already published by the tracing // entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName.length() + "#" + serviceName + TraceError.SERVICE_DUPLICATE); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else { synchronized (tService.getProviders()) { error = tService.getProviders().add(tEntity); } if (error) { synchronized (tEntity.getPublishedTS()) { error = tEntity.getPublishedTS().add( tService); } } if (!error) { // Impossible to link properly tracing service // and // provider agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName.length() + "#" + serviceName + TraceError.SUBSCRIPTION_ERROR); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else { linked_TE_TS = true; } } } if (agree_response) { tEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.PUBLISHED_TRACING_SERVICE] .getName(), tEntity, serviceName); try { sendTraceEvent(tEvent); } catch (TraceServiceNotAllowedException e) { logger.error("The tracing service PUBLISHED_TRACING_SERVICE must always be sent."); e.printStackTrace(); } // sendSystemTraceEvent(tEvent, tEntity); response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("publish#" + serviceName); logger.info("[TRACE MANAGER]: Sending AGREE message to " + msg.getReceiver().toString()); } else { if (linked_TE_TS) { synchronized (tService.getProviders()) { tService.getProviders().remove(tEntity); } synchronized (tEntity.getPublishedTS()) { tEntity.getPublishedTS().remove(tService); } } if (added_TS) { synchronized (TracingServices) { TracingServices.remove(tService); } } if (added_TSP) { synchronized (TSProviderEntities) { TSProviderEntities.remove(tEntity); } } } } else if (command.equals("unpublish")) { // Remove publication of a tracing service serviceName = content.substring(index + 1); // Check the CUSTOM bit of the trace mask. if (this.traceMask.get(TraceMask.CUSTOM) == false) { // The mask does not allow to publish tracing services. agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unpublish#" + serviceName.length() + "#" + serviceName + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if ((tService = TracingServices.getTS(serviceName)) == null) { // The tracing service does not exist agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unpublish#" + serviceName.length() + "#" + serviceName + TraceError.SERVICE_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if (tService.getMandatory()) { // The tracing service cannot be unpublished agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unpublish#" + serviceName.length() + "#" + serviceName + TraceError.BAD_SERVICE); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if ((tEntity = TSProviderEntities.getTEByAid(msg .getSender())) == null) { // The tracing entity does not offer the tracing service agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unpublish#" + serviceName.length() + "#" + serviceName + TraceError.BAD_SERVICE); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if (!tService.getProviders().contains(tEntity) || !tEntity.getPublishedTS().contains(tService)) { // Tracing service not published by the tracing entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unpublish#" + serviceName.length() + "#" + serviceName + TraceError.BAD_SERVICE); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } // Remove all subscriptions and send the corresponding trace // events to subscriptors if (agree_response) { synchronized (tService.getSubscriptions()) { TSS_iter = tService.getSubscriptions().iterator(); if (tService.getProviders().size() == 1) { // Just one provider: remove all subscriptions while (TSS_iter.hasNext()) { tServiceSubscription = TSS_iter.next(); tServiceSubscription.getSubscriptorEntity().getSubscribedToTS().remove(tServiceSubscription); if (tServiceSubscription.getSubscriptorEntity().getSubscribedToTS().size() == 0) { synchronized (TSSubscriberEntities) { TSSubscriberEntities.remove(tServiceSubscription.getSubscriptorEntity()); } } TSS_iter.remove(); if (tServiceSubscription.getAnyProvider()) { tEventContent = serviceName + "#any"; // Remove subscription this.traceSession.exchangeUnbind(tServiceSubscription.getSubscriptorEntity().getAid().name + ".trace", "amq.match", serviceName + "#any", Option.NONE); } else { tEventContent = serviceName + msg.getSender(); // Remove subscription this.traceSession.exchangeUnbind(tServiceSubscription.getSubscriptorEntity().getAid().name + ".trace", "amq.match", serviceName + "#" + msg.getSender(), Option.NONE); } tEvent = new TraceEvent(TracingService.DI_TracingServices[TracingService.UNAVAILABLE_TS].getName(), new AgentID("system", this.getAid().protocol, this.getAid().host, this.getAid().port), tEventContent); sendSystemTraceEvent(tEvent,tServiceSubscription.getSubscriptorEntity()); } } else { while (TSS_iter.hasNext()) { tServiceSubscription = TSS_iter.next(); if (!tServiceSubscription.getAnyProvider() && tServiceSubscription.getOriginEntity().equals(tEntity)) { tServiceSubscription.getSubscriptorEntity().getSubscribedToTS().remove(tServiceSubscription); if (tServiceSubscription.getSubscriptorEntity().getSubscribedToTS().size() == 0) { synchronized (TSSubscriberEntities) { TSSubscriberEntities.remove(tServiceSubscription.getSubscriptorEntity()); } } TSS_iter.remove(); tEventContent = serviceName + msg.getSender(); // Remove subscription this.traceSession.exchangeUnbind(tServiceSubscription.getSubscriptorEntity().getAid().name + ".trace", "amq.match", serviceName + "#" + msg.getSender(), Option.NONE); tEvent = new TraceEvent(TracingService.DI_TracingServices[TracingService.UNAVAILABLE_TS].getName(), new AgentID("system", this.getAid().protocol, this.getAid().host, this.getAid().port), tEventContent); sendSystemTraceEvent(tEvent, tServiceSubscription.getSubscriptorEntity()); } } } } synchronized (TracingServices) { TracingServices.remove(tService); } synchronized (tEntity.getPublishedTS()) { tEntity.getPublishedTS().remove(tService); } synchronized (tService.getProviders()) { tService.getProviders().remove(tEntity); } if (tEntity.getPublishedTS().size() == 0) { synchronized (TSProviderEntities) { TSProviderEntities.remove(tEntity); } } tEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.UNPUBLISHED_TRACING_SERVICE] .getName(), tEntity, serviceName); try { sendTraceEvent(tEvent); } catch (TraceServiceNotAllowedException e) { logger.error("The tracing service UNPUBLISHED_TRACING_SERVICE must always be sent."); e.printStackTrace(); } // tEventContent=TracingService.DI_TracingServices[TracingService.UNPUBLISHED_TRACING_SERVICE].getName() // + // "#" + serviceName + "#" + tEntity.getAid(); // tEvent=new // TraceEvent(TracingService.DI_TracingServices[TracingService.UNPUBLISHED_TRACING_SERVICE].getName(), // tEntity, tEventContent); // sendSystemTraceEvent(tEvent, tEntity); response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unpublish#" + serviceName); logger.info("[TRACE MANAGER]: Sending AGREE message to " + msg.getReceiver().toString()); } } else if (command.equals("list")) { specification = content.substring(index + 1); if (specification.contentEquals("entities")) { // Check the LIST_ENTITIES bit of the trace mask. if (this.traceMask.get(TraceMask.LIST_ENTITIES) == false) { response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("list#entities#" + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else { // Return all available tracing entities response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); content = "list#entities#" + TracingEntities.size(); synchronized (TracingEntities) { TE_iter = TracingEntities.iterator(); while (TE_iter.hasNext()) { tEntity = TE_iter.next(); content = content + "#" + tEntity.getType() + "#" + tEntity.getAid().toString() .length() + "#" + tEntity.getAid().toString(); } } response_msg.setContent(content); } } // else if // (specification.contentEquals("service_entities")){ // // Return all tracing entities which offer a tracing // service // requestedAid=new AgentID // response_msg = new ACLMessage(ACLMessage.AGREE); // response_msg.setSender(this.getAid()); // response_msg.setReceiver(msg.getSender()); // // content="list#service_entities#"+ TracingEntities.size(); // // TE_iter=TracingEntities.iterator(); // // while(TE_iter.hasNext()){ // tEntity=TE_iter.next(); // // content = content + "#" + tEntity.getType() + "#" + // tEntity.getAid().toString().length() + "#" + // tEntity.getAid().toString(); // } // // response_msg.setContent(content); // } else if (specification.contentEquals("services")) { // Check the LIST_SERVICES bit of the trace mask. if (this.traceMask.get(TraceMask.LIST_SERVICES) == false) { response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("list#services#" + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else { // Return all tracing services allowed by the mask. counter = 0; response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); content = "list#services#"; auxContent = ""; synchronized (TracingServices) { TS_iter = TracingServices.iterator(); while (TS_iter.hasNext()) { tService = TS_iter.next(); if (tService.getMaskBitIndex() == null || this.traceMask.get(tService .getMaskBitIndex()) == true) { auxContent += "#" + tService.getName().length() + "#" + tService.getName() + "#" + tService.getDescription(); ++counter; } } } response_msg.setContent(content + Integer.toString(counter) + auxContent); } } // else if (specification.contentEquals("entity_services")){ // // Return all tracing services a tracing entity offers // response_msg = new ACLMessage(ACLMessage.AGREE); // response_msg.setSender(this.getAid()); // response_msg.setReceiver(msg.getSender()); // // content="list#services#"+ TracingServices.size(); // // TS_iter=TracingServices.iterator(); // // while(TS_iter.hasNext()){ // tService=TS_iter.next(); // // content = content + "#" + tService.getName().length() + // "#" + // tService.getName() + // "#" + tService.getDescription(); // } // // response_msg.setContent(content); // } else { index2 = content.indexOf('#',index+1); if(index2 > 0) specification = content.substring(index+1,index2); if (specification.contentEquals("service")) { // Return service description serviceName = content.substring(index2 + 1); if ((tService = TracingServices.getTS(serviceName)) == null || (tService.getMaskBitIndex() != null && this.traceMask .get(tService.getMaskBitIndex()) == false)) { /* * The tracing service does not exist or it is * related with a bit in the mask and that bit * is not active. */ agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("list#service#" + serviceName.length() + "#" + serviceName + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else { response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); content = "list#service#" + serviceName.length() + "#" + serviceName + tService.getDescription(); response_msg.setContent(content); } } else { /* * Building a ACLMessage */ response_msg = new ACLMessage(ACLMessage.UNKNOWN); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent(content); logger.info("[TRACE MANAGER]: Returning UNKNOWN message to " + msg.getReceiver().toString()); } } } else if (command.equals("UpdateMask")) { /* * Building a ACLMessage */ response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("UpdateMask#"+getTraceMask().toString()); response_msg.setConversationId(msg.getConversationId()); } else { /* * Building a ACLMessage */ response_msg = new ACLMessage(ACLMessage.UNKNOWN); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent(content); logger.info("[TRACE MANAGER]: Returning UNKNOWN message to " + msg.getReceiver().toString()); } send(response_msg); break; case ACLMessage.SUBSCRIBE: // Subscription to tracing services arguments = new HashMap<String, Object>(); content = msg.getContent(); if (content.equals("all")) { if (this.monitorizable) { if ((tEntity = TracingEntities.getTEByAid(msg .getSender())) == null) { // Tracing entity not found agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#3#all" + TraceError.ENTITY_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if (!TSSubscriberEntities.contains(tEntity)) { // Register tracing entity as subscriber synchronized (TSSubscriberEntities) { error = TSSubscriberEntities.add(tEntity); } if (!error) { // Error adding the tracing entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#3#all" + TraceError.SUBSCRIPTION_ERROR); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } } if (agree_response) { synchronized (TracingServices) { TS_iter = TracingServices.iterator(); while (TS_iter.hasNext()) { tService = TS_iter.next(); // Subscribe to the tracing service if (tEntity.getSubscribedToTS().getTSS( tEntity, null, tService) != null || (tService.getMaskBitIndex() != null && this.traceMask .get(tService .getMaskBitIndex()) == false)) { /* * The subscription already exists or * the tracing service is related with a * bit in the mask that is off. */ continue; } else { // Add subscription tServiceSubscription = new TracingServiceSubscription( tEntity, null, tService); tService.addSubscription(tServiceSubscription); tEntity.addSubscription(tServiceSubscription); if (!tService .getName() .contentEquals( TracingService.DI_TracingServices[TracingService.SUBSCRIBE] .getName()) && !tService .getName() .contentEquals( TracingService.DI_TracingServices[TracingService.UNSUBSCRIBE] .getName())) { arguments.put("x-match", "all"); arguments.put("tracing_service", tService.getName()); this.traceSession .exchangeBind( msg.getSender().name + ".trace", "amq.match", tService.getName() + "#any", arguments); } // Send system trace event tEventContent = tService.getName() + "#" + tService.getDescription() .length() + " " + tService.getDescription() + "#any"; tEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.SUBSCRIBE] .getName(), tEntity, tEventContent); sendSystemTraceEvent(tEvent, tServiceSubscription .getSubscriptorEntity()); } } } response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#3#all"); logger.info("[TRACE MANAGER]: sending message to " + msg.getReceiver().toString()); } } else { // Not in monitorizable mode agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#3#all" + TraceError.AUTHORIZATION_ERROR); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } } else { index = content.indexOf('#', 0); serviceName = content.substring(0, index); originEntity = content.substring(index + 1); if (!originEntity.contentEquals("any")) { originAid = new AgentID(originEntity); /*originAid = new AgentID(); aidindice1 = 0; aidindice2 = originEntity.indexOf(':'); if (aidindice2 - aidindice1 <= 0) originAid.protocol = ""; else originAid.protocol = originEntity.substring( aidindice1, aidindice2); aidindice1 = aidindice2 + 3; aidindice2 = originEntity.indexOf('@', aidindice1); if (aidindice2 - aidindice1 <= 0) originAid.name = ""; else originAid.name = originEntity.substring(aidindice1, aidindice2); aidindice1 = aidindice2 + 1; aidindice2 = originEntity.indexOf(':', aidindice1); if (aidindice2 - aidindice1 <= 0) originAid.host = ""; else originAid.host = originEntity.substring(aidindice1, aidindice2); originAid.port = originEntity.substring(aidindice2 + 1);*/ } else { originAid = null; originTEntity = null; } if ((tEntity = TracingEntities.getTEByAid(msg.getSender())) == null) { // Tracing entity not found agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.ENTITY_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if ((tService = TracingServices.getTS(serviceName)) == null) { // The tracing service does not exist agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SERVICE_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if (!tService.getRequestable()) { // The tracing service is not requestable agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.BAD_SERVICE); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if ((originAid != null) && ((originTEntity = tService.getProviders() .getTEByAid(originAid)) == null)) { // Tracing service not published by the origin tracing // entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SERVICE_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } // Check if the subscription already exists else if (TSSubscriberEntities.contains(tEntity) && (tEntity.getSubscribedToTS().getTSS(tEntity, originTEntity, tService) != null)) { // The subscription already exists agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SUBSCRIPTION_DUPLICATE); logger.info("[TRACE MANAGER]: sending REFUSE message to " + msg.getReceiver().toString()); } else if (tService.getMaskBitIndex() != null && this.traceMask.get(tService.getMaskBitIndex()) == false) { // Tracing service not allowed by the mask. agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } else if (!TSSubscriberEntities.contains(tEntity)) { // Register tracing entity as subscriber synchronized (TSSubscriberEntities) { error = TSSubscriberEntities.add(tEntity); } if (error) { added_TSS = true; } else { // Error adding the tracing entity agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SUBSCRIPTION_ERROR); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } } if (agree_response) { // Add subscription tServiceSubscription = new TracingServiceSubscription( tEntity, originTEntity, tService); synchronized (tService.getSubscriptions()) { tService.addSubscription(tServiceSubscription); } synchronized (tEntity.getSubscribedToTS()) { tEntity.addSubscription(tServiceSubscription); } arguments.put("x-match", "all"); arguments.put("tracing_service", serviceName); if (!originEntity.contentEquals("any")) { arguments.put("origin_entity", originEntity); } this.traceSession.exchangeBind(msg.getSender().name + ".trace", "amq.match", serviceName + "#" + originEntity, arguments); // Send system trace event // tEntity=new TracingEntity(TracingEntity.AGENT, // new AgentID("system", this.getAid().protocol, // this.getAid().host, this.getAid().port)); // tEventContent=tService.getName() + "#" + // originEntity; tEventContent = serviceName + "#" + tService.getDescription().length() + "#" + tService.getDescription() + "#" + originEntity; tEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.SUBSCRIBE] .getName(), tEntity, tEventContent); sendSystemTraceEvent(tEvent, tServiceSubscription.getSubscriptorEntity()); response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("subscribe#" + serviceName.length() + "#" + serviceName + "#" + originEntity); // logger.info("[TRACE MANAGER]: sending message to " + // msg.getReceiver().toString()); } else { if (added_TSS) { synchronized (TSSubscriberEntities) { TSSubscriberEntities.remove(tEntity); } } } } send(response_msg); break; case ACLMessage.CANCEL: // Unsubscription from a tracing service content = msg.getContent(); index = content.indexOf('#', 0); serviceName = content.substring(0, index); originEntity = content.substring(index + 1); if (!originEntity.contentEquals("any")) { originAid = new AgentID(originEntity); /*originAid = new AgentID(); aidindice1 = 0; aidindice2 = originEntity.indexOf(':'); if (aidindice2 - aidindice1 <= 0) originAid.protocol = ""; else originAid.protocol = originEntity.substring(aidindice1, aidindice2); aidindice1 = aidindice2 + 3; aidindice2 = originEntity.indexOf('@', aidindice1); if (aidindice2 - aidindice1 <= 0) originAid.name = ""; else originAid.name = originEntity.substring(aidindice1, aidindice2); aidindice1 = aidindice2 + 1; aidindice2 = originEntity.indexOf(':', aidindice1); if (aidindice2 - aidindice1 <= 0) originAid.host = ""; else originAid.host = originEntity.substring(aidindice1, aidindice2); originAid.port = originEntity.substring(aidindice2 + 1);*/ } else { originAid = null; originTEntity = null; } if ((tService = TracingServices.getTS(serviceName)) == null) { // The tracing service does not exist agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unsubscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SERVICE_NOT_FOUND); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } // Check if the subscription exists else if (((tEntity = TSSubscriberEntities.getTEByAid(msg .getSender())) == null) || ((originAid != null) && (originTEntity = tService .getProviders().getTEByAid(originAid)) == null) || ((tServiceSubscription = tEntity.getSubscribedToTS() .getTSS(tEntity, originTEntity, tService)) == null)) { // The subscription does not exist agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unsubscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SUBSCRIPTION_NOT_FOUND); logger.info("[TRACE MANAGER]: sending REFUSE message to " + msg.getReceiver().toString()); } // Check if the trace mask allows this operation. else if (tService.getMaskBitIndex() != null && this.traceMask.get(tService.getMaskBitIndex()) == false) { // The tracing service is not allowed by the mask. agree_response = false; response_msg = new ACLMessage(ACLMessage.REFUSE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unsubscribe#" + serviceName.length() + "#" + serviceName + originEntity.length() + "#" + originEntity + TraceError.SERVICE_NOT_ALLOWED); logger.info("[TRACE MANAGER]: Sending REFUSE message to " + msg.getReceiver().toString()); } if (agree_response) { // Remove subscription synchronized (tEntity.getSubscribedToTS()) { tEntity.getSubscribedToTS() .remove(tServiceSubscription); } if (tEntity.getSubscribedToTS().size() == 0) { synchronized (TSSubscriberEntities) { TSSubscriberEntities.remove(tEntity); } } synchronized (tService.getSubscriptions()) { tService.getSubscriptions() .remove(tServiceSubscription); } this.traceSession.exchangeUnbind(msg.getSender().name + ".trace", "amq.match", serviceName + "#" + originEntity, Option.NONE); // logger.info("[TRACE MANAGER]: unbinding " + // msg.getSender().name+".trace from " + eventType); // Send system trace event tEventContent = serviceName + "#" + originEntity; tEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.UNSUBSCRIBE] .getName(), tEntity, tEventContent); sendSystemTraceEvent(tEvent, tEntity); response_msg = new ACLMessage(ACLMessage.AGREE); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent("unsubscribe#" + serviceName.length() + "#" + serviceName + "#" + originEntity); logger.info("[TRACE MANAGER]: sending AGREE message to " + msg.getReceiver().toString()); } send(response_msg); break; default: response_msg = new ACLMessage(ACLMessage.UNKNOWN); response_msg.setSender(this.getAid()); response_msg.setReceiver(msg.getSender()); response_msg.setContent(msg.getContent()); logger.info("Mensaje received in " + this.getName() + " agent, by onMessage: " + msg.getContent()); logger.info("[TRACE MANAGER]: returning UNKNOWN message to " + msg.getReceiver().toString()); send(response_msg); } }
void es.upv.dsic.gti_ia.trace.TraceManager.onTraceEvent | ( | TraceEvent | tEvent | ) |
Function that will be executed after a previous preprocessing of a received trace event. The user might override this method if he wants to apply a custom postprocessing.
Reimplemented from es.upv.dsic.gti_ia.core.BaseAgent.
Definition at line 1523 of file TraceManager.java.
{ TracingEntity tEntity; TracingService tService; TracingServiceSubscription tServiceSubscription; Iterator<TracingService> TS_iter; Iterator<TracingServiceSubscription> TSS_iter; TraceEvent responseTEvent; boolean error; if (tEvent.getTracingService().contentEquals( TracingService.DI_TracingServices[TracingService.NEW_AGENT] .getName()) == true) { // Register tracing entity tEntity = new TracingEntity(TracingEntity.AGENT, new AgentID( tEvent.getContent())); synchronized (TracingEntities) { error = TracingEntities.add(tEntity); } if (error) { for (int i = 0; i < TracingService.MAX_DI_TS; i++) { if (TracingService.DI_TracingServices[i].getRequestable()) { tService = TracingServices .getTS(TracingService.DI_TracingServices[i] .getName()); synchronized (tEntity.getPublishedTS()) { tEntity.getPublishedTS().add(tService); } synchronized (tService.getProviders()) { tService.getProviders().add(tEntity); } } } } else { // TODO: TRACE_ERROR ?? // System.out.println("ERROR"); } // Send a NEW_MASK trace event with the new trace mask. responseTEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.NEW_MASK] .getName(), new AgentID(SYSTEM_NAME, this.getAid().protocol, this .getAid().host, this.getAid().port), this .getTraceMask().toString()); this.sendSystemTraceEvent(responseTEvent, tEntity); } else if (tEvent .getTracingService() .contentEquals( TracingService.DI_TracingServices[TracingService.AGENT_DESTROYED] .getName()) == true) { // Unregister tracing entity tEntity = TracingEntities.getTEByAid(new AgentID(tEvent .getContent())); // Cancel subscriptions of that tracing entity to any tracing // service if (TSSubscriberEntities.contains(tEntity)) { // There are subscriptions to cancel synchronized (tEntity.getSubscribedToTS()) { TSS_iter = tEntity.getSubscribedToTS().iterator(); while (TSS_iter.hasNext()) { tServiceSubscription = TSS_iter.next(); TracingServices .getTS(tServiceSubscription.getTracingService() .getName()).getSubscriptions() .remove(tServiceSubscription); TSS_iter.remove(); } } synchronized (TSSubscriberEntities) { TSSubscriberEntities.remove(tEntity); } } // Unpublish tracing services the tracing entity was offering if (TSProviderEntities.contains(tEntity)) { // There are services which may have to be unpublished synchronized (tEntity.getPublishedTS()) { TS_iter = tEntity.getPublishedTS().iterator(); while (TS_iter.hasNext()) { tService = TS_iter.next(); synchronized (tService.getSubscriptions()) { TSS_iter = tService.getSubscriptions().iterator(); if (tService.getProviders().size() == 1) { // Just one provider: remove all subscriptions // and inform to subscriptors while (TSS_iter.hasNext()) { tServiceSubscription = TSS_iter.next(); tServiceSubscription.getSubscriptorEntity() .getSubscribedToTS() .remove(tServiceSubscription); if (tServiceSubscription .getSubscriptorEntity() .getSubscribedToTS().size() == 0) { synchronized (TSSubscriberEntities) { TSSubscriberEntities .remove(tServiceSubscription .getSubscriptorEntity()); } } TSS_iter.remove(); responseTEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.UNAVAILABLE_TS] .getName(), new AgentID( "system", this.getAid().protocol, this.getAid().host, this .getAid().port), ""); if (tServiceSubscription.getAnyProvider()) { responseTEvent.setContent(tService .getName() + "#any"); // Remove subscription this.traceSession.exchangeUnbind( tServiceSubscription .getSubscriptorEntity() .getAid().name + ".trace", "amq.match", tService.getName() + "#any", Option.NONE); } else { responseTEvent.setContent(tService .getName() + tEntity.getAid().toString()); // Remove subscription this.traceSession.exchangeUnbind( tServiceSubscription .getSubscriptorEntity() .getAid().name + ".trace", "amq.match", tService.getName() + tEntity.getAid() .toString(), Option.NONE); } sendSystemTraceEvent(responseTEvent, tServiceSubscription .getSubscriptorEntity()); } } else { while (TSS_iter.hasNext()) { tServiceSubscription = TSS_iter.next(); if (!tServiceSubscription.getAnyProvider() && tServiceSubscription .getOriginEntity().equals( tEntity)) { tServiceSubscription .getSubscriptorEntity() .getSubscribedToTS() .remove(tServiceSubscription); if (tServiceSubscription .getSubscriptorEntity() .getSubscribedToTS().size() == 0) { synchronized (TSSubscriberEntities) { TSSubscriberEntities .remove(tServiceSubscription .getSubscriptorEntity()); } } TSS_iter.remove(); responseTEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.UNAVAILABLE_TS] .getName(), new AgentID("system", this .getAid().protocol, this.getAid().host, this.getAid().port), tService.getName() + tEntity.getAid() .toString()); // Remove subscription this.traceSession.exchangeUnbind( tServiceSubscription .getSubscriptorEntity() .getAid().name + ".trace", "amq.match", tService.getName() + tEntity.getAid() .toString(), Option.NONE); sendSystemTraceEvent(responseTEvent, tServiceSubscription .getSubscriptorEntity()); } } } } TS_iter.remove(); synchronized (tService.getProviders()) { tService.getProviders().remove(tEntity); } } } synchronized (TSProviderEntities) { TSProviderEntities.remove(tEntity); } } synchronized (TracingEntities) { TracingEntities.remove(tEntity); } } }
void es.upv.dsic.gti_ia.trace.TraceManager.setTraceMask | ( | TraceMask | traceMask | ) |
Set a new trace mask in the system.
traceMask | the trace mask to set |
Definition at line 213 of file TraceManager.java.
{ this.traceMask = traceMask.clone(); // Expand the new trace mask to other agents. TraceEvent newMaskEvent = new TraceEvent( TracingService.DI_TracingServices[TracingService.NEW_MASK] .getName(), new AgentID(SYSTEM_NAME, this.getAid().protocol, this.getAid().host, this.getAid().port), traceMask .toString()); this.sendSystemTraceEvent(newMaskEvent, null); }
Finish the execution of the TraceManager.
Definition at line 445 of file TraceManager.java.
{ this.finishExecution.release(); }
final AgentID es.upv.dsic.gti_ia.trace.TraceManager.DEFAULT_TM_AID = new AgentID(DEFAULT_TM_NAME) [static] |
Constant with the agent id of the default trace manager.
Definition at line 50 of file TraceManager.java.
final String es.upv.dsic.gti_ia.trace.TraceManager.DEFAULT_TM_NAME = "qpid://TM@localhost:8080" [static] |
Constant with the name of the default trace manager.
Definition at line 45 of file TraceManager.java.