00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _WIN32
00035 #include "config.h"
00036 #else
00037 #include "winconfig.h"
00038 #endif
00039
00040 #include "include/ams.h"
00041 #include "include/agent.h"
00042 #include "include/data_structures.h"
00043 #include "include/mc_platform.h"
00044
00045 int
00046 ams_Destroy(ams_p ams)
00047 {
00048 MUTEX_DESTROY(ams->runflag_lock);
00049 free(ams->runflag_lock);
00050 COND_DESTROY(ams->runflag_cond);
00051 free(ams->runflag_cond);
00052 free(ams);
00053 return MC_SUCCESS;
00054 }
00055
00056 ams_p
00057 ams_Initialize(mc_platform_p mc_platform)
00058 {
00059 ams_p ams;
00060 ams = (ams_p)malloc(sizeof(ams_t));
00061 CHECK_NULL(ams, exit(0););
00062 ams->mc_platform = mc_platform;
00063
00064 ams->runflag_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00065 CHECK_NULL(ams->runflag_lock, exit(0););
00066 MUTEX_INIT(ams->runflag_lock);
00067
00068 ams->runflag_cond = (COND_T*)malloc(sizeof(COND_T));
00069 CHECK_NULL(ams->runflag_cond, exit(0););
00070 COND_INIT(ams->runflag_cond);
00071
00072 ams->run = 0;
00073
00074 ams->waiting = 0;
00075 ams->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00076 MUTEX_INIT(ams->waiting_lock);
00077 ams->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00078 COND_INIT(ams->waiting_cond);
00079
00080 return ams;
00081 }
00082
00083 void
00084 ams_Print(ams_p ams)
00085 {
00086 int i;
00087 MCAgent_t agent;
00088 agent_queue_p alist;
00089
00090 alist = ams->mc_platform->agent_queue;
00091
00092 MUTEX_LOCK(alist->lock);
00093
00094 if(alist->size == 0)
00095 {
00096 MUTEX_UNLOCK(alist->lock);
00097 return;
00098 }
00099
00100
00101 printf("%d total agents on board.\n", alist->size);
00102 for(i=0; i<alist->size; i++)
00103 {
00104 agent = (MCAgent_t)ListSearch(alist->list, i);
00105 printf("Agent id: %lu, Connect id: %lu, status: %u\n",
00106 agent->id,
00107 agent->connect_id,
00108 agent->agent_status);
00109 }
00110
00111 MUTEX_UNLOCK(alist->lock);
00112 return;
00113 }
00114
00115 extern int
00116 ams_ManageAgentList(ams_p ams)
00117 {
00118
00119 MCAgent_t current_agent;
00120 int index;
00121 agent_queue_p alist;
00122 mc_platform_p global;
00123 message_p message;
00124
00125 alist = ams->mc_platform->agent_queue;
00126 global = ams->mc_platform;
00127
00128
00129
00130 MUTEX_LOCK(alist->lock);
00131 for(index=0; index<alist->size; index++)
00132 {
00133 if((current_agent = ListSearch(alist->list, index)))
00134 {
00135 MUTEX_UNLOCK(alist->lock);
00136 MUTEX_LOCK(current_agent->lock);
00137 current_agent->orphan = 0;
00138 MUTEX_LOCK(global->quit_lock);
00139 if ( global->quit && current_agent->agent_status != MC_WAIT_MESSGSEND) {
00140 MUTEX_UNLOCK(global->quit_lock);
00141 MUTEX_UNLOCK(current_agent->lock);
00142 MC_TerminateAgent(current_agent);
00143
00144 MUTEX_LOCK(current_agent->run_lock);
00145 MUTEX_UNLOCK(current_agent->run_lock);
00146 continue;
00147 }
00148 MUTEX_UNLOCK(global->quit_lock);
00149 switch(current_agent->agent_status)
00150 {
00151 case MC_WAIT_CH :
00152 MUTEX_UNLOCK(current_agent->lock);
00153 agent_RunChScript(current_agent, global);
00154 break;
00155 case MC_AGENT_ACTIVE :
00156 MUTEX_UNLOCK(current_agent->lock);
00157
00158 break;
00159 case MC_WAIT_MESSGSEND :
00160 current_agent->agent_status = MC_WAIT_FINISHED;
00161 MUTEX_UNLOCK(current_agent->lock);
00162 MUTEX_LOCK(ams->runflag_lock);
00163 ams->run = 1;
00164 MUTEX_UNLOCK(ams->runflag_lock);
00165 MUTEX_UNLOCK(current_agent->lock);
00166 message = message_New();
00167 if (
00168 message_InitializeFromAgent
00169 (
00170 ams->mc_platform,
00171 message,
00172 current_agent
00173 )
00174 )
00175 {
00176 message_Destroy(message);
00177 message = NULL;
00178 } else {
00179
00180
00181 current_agent->name = realloc(
00182 current_agent->name,
00183 sizeof(char) * (strlen(current_agent->name) + 10)
00184 );
00185 strcat(current_agent->name, "_SENDING");
00186 message_queue_Add(
00187 ams->mc_platform->message_queue,
00188 message
00189 );
00190 }
00191 break;
00192 case MC_AGENT_NEUTRAL :
00193 MUTEX_UNLOCK(current_agent->lock);
00194 break;
00195 case MC_WAIT_FINISHED :
00196 MUTEX_UNLOCK(current_agent->lock);
00197 agent_queue_RemoveIndex(alist, index);
00198 index=0;
00199 break;
00200 default :
00201 printf("ERROR IN AGENT FORMAT");
00202 printf("Agent Format %d not recognized.",
00203 current_agent->agent_status);
00204
00205 current_agent->agent_status = MC_WAIT_FINISHED;
00206 MUTEX_UNLOCK(current_agent->lock);
00207 }
00208 } else {
00209 MUTEX_UNLOCK( alist->lock );
00210 }
00211 MUTEX_LOCK( alist->lock );
00212 }
00213 MUTEX_UNLOCK( alist->lock );
00214 return 0 ;
00215 }
00216
00217 void
00218 ams_Start(mc_platform_p mc_platform)
00219 {
00220 ams_p ams = mc_platform->ams;
00221 #ifndef _WIN32
00222 pthread_attr_t attr;
00223 pthread_attr_init(&attr);
00224 if(mc_platform->stack_size[MC_THREAD_AMS] != -1) {
00225 pthread_attr_setstacksize
00226 (
00227 &attr,
00228 mc_platform->stack_size[MC_THREAD_AMS]
00229 );
00230 }
00231 #else
00232 int stack_size;
00233 if (mc_platform->stack_size[MC_THREAD_AMS] < 1) {
00234
00235 stack_size = mc_platform->stack_size[MC_THREAD_AMS]+1;
00236 } else {
00237 stack_size = mc_platform->stack_size[MC_THREAD_AMS];
00238 }
00239 #endif
00240 THREAD_CREATE
00241 (
00242 &ams->thread,
00243 ams_Thread,
00244 mc_platform
00245 );
00246 }
00247 #ifndef _WIN32
00248 void*
00249 ams_Thread(void* arg)
00250 #else
00251 DWORD WINAPI
00252 ams_Thread( LPVOID arg )
00253 #endif
00254 {
00255 mc_platform_p mc_platform = (mc_platform_p)arg;
00256 ams_p ams = mc_platform->ams;
00257
00258 while(1) {
00259 MUTEX_LOCK(ams->runflag_lock);
00260 MUTEX_LOCK(mc_platform->quit_lock);
00261 while(ams->run == 0 && !mc_platform->quit) {
00262 MUTEX_UNLOCK(mc_platform->quit_lock);
00263
00264 MUTEX_LOCK(ams->waiting_lock);
00265 ams->waiting = 1;
00266 COND_BROADCAST(ams->waiting_cond);
00267 MUTEX_UNLOCK(ams->waiting_lock);
00268
00269 COND_WAIT
00270 (
00271 ams->runflag_cond,
00272 ams->runflag_lock
00273 );
00274 MUTEX_LOCK(mc_platform->quit_lock);
00275 }
00276
00277 MUTEX_LOCK(ams->waiting_lock);
00278 ams->waiting = 0;
00279 COND_BROADCAST(ams->waiting_cond);
00280 MUTEX_UNLOCK(ams->waiting_lock);
00281 if (ams->run == 0 && mc_platform->quit) {
00282 MUTEX_UNLOCK(mc_platform->quit_lock);
00283 MUTEX_UNLOCK(ams->runflag_lock);
00284 ams_ManageAgentList(ams);
00285 THREAD_EXIT();
00286 }
00287 ams->run = 0;
00288 MUTEX_UNLOCK(mc_platform->quit_lock);
00289 MUTEX_UNLOCK(ams->runflag_lock);
00290 ams_ManageAgentList(ams);
00291 }
00292 THREAD_EXIT();
00293 }