00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _WIN32
00035 #include "config.h"
00036 #else
00037 #include "winconfig.h"
00038 #endif
00039
00040 #include "include/ams.h"
00041 #include "include/agent.h"
00042 #include "include/data_structures.h"
00043 #include "include/mc_platform.h"
00044
00045 int
00046 ams_Destroy(ams_p ams)
00047 {
00048 MUTEX_DESTROY(ams->runflag_lock);
00049 free(ams->runflag_lock);
00050 COND_DESTROY(ams->runflag_cond);
00051 free(ams->runflag_cond);
00052 MUTEX_DESTROY(ams->waiting_lock);
00053 free(ams->waiting_lock);
00054 COND_DESTROY(ams->waiting_cond);
00055 free(ams->waiting_cond);
00056 free(ams);
00057 return MC_SUCCESS;
00058 }
00059
00060 ams_p
00061 ams_Initialize(mc_platform_p mc_platform)
00062 {
00063 ams_p ams;
00064 ams = (ams_p)malloc(sizeof(ams_t));
00065 CHECK_NULL(ams, exit(0););
00066 ams->mc_platform = mc_platform;
00067
00068 ams->runflag_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00069 CHECK_NULL(ams->runflag_lock, exit(0););
00070 MUTEX_INIT(ams->runflag_lock);
00071
00072 ams->runflag_cond = (COND_T*)malloc(sizeof(COND_T));
00073 CHECK_NULL(ams->runflag_cond, exit(0););
00074 COND_INIT(ams->runflag_cond);
00075
00076 ams->run = 0;
00077
00078 ams->waiting = 0;
00079 ams->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00080 MUTEX_INIT(ams->waiting_lock);
00081 ams->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00082 COND_INIT(ams->waiting_cond);
00083
00084 return ams;
00085 }
00086
00087 void
00088 ams_Print(ams_p ams)
00089 {
00090 int i;
00091 MCAgent_t agent;
00092 agent_queue_p alist;
00093
00094 alist = ams->mc_platform->agent_queue;
00095
00096 MUTEX_LOCK(alist->lock);
00097
00098 if(alist->size == 0)
00099 {
00100 MUTEX_UNLOCK(alist->lock);
00101 return;
00102 }
00103
00104
00105 printf("%d total agents on board.\n", alist->size);
00106 for(i=0; i<alist->size; i++)
00107 {
00108 agent = (MCAgent_t)ListSearch(alist->list, i);
00109 MUTEX_LOCK(agent->agent_status_lock);
00110 printf("Agent id: %lu, Connect id: %lu, status: %u\n",
00111 agent->id,
00112 agent->connect_id,
00113 agent->agent_status);
00114 MUTEX_UNLOCK(agent->agent_status_lock);
00115 }
00116
00117 MUTEX_UNLOCK(alist->lock);
00118 return;
00119 }
00120
00121 extern int
00122 ams_ManageAgentList(ams_p ams)
00123 {
00124
00125 MCAgent_t current_agent;
00126 int index = 0;
00127 agent_queue_p alist;
00128 mc_platform_p global;
00129 message_p message;
00130
00131 alist = ams->mc_platform->agent_queue;
00132 global = ams->mc_platform;
00133
00134
00135
00136 MUTEX_LOCK(alist->lock);
00137 for(index=0; index<alist->size; index++)
00138 {
00139 if((current_agent = (MCAgent_t)ListSearch(alist->list, index)))
00140 {
00141 if(current_agent->binary) {continue;}
00142 MUTEX_UNLOCK(alist->lock);
00143 MUTEX_LOCK(current_agent->lock);
00144 current_agent->orphan = 0;
00145 MUTEX_LOCK(global->quit_lock);
00146 MUTEX_LOCK(current_agent->agent_status_lock);
00147 if(global->quit && current_agent->agent_status != MC_WAIT_MESSGSEND) {
00148 MUTEX_UNLOCK(current_agent->agent_status_lock);
00149 MUTEX_UNLOCK(global->quit_lock);
00150 MUTEX_UNLOCK(current_agent->lock);
00151 MC_TerminateAgent(current_agent);
00152
00153 MUTEX_LOCK(current_agent->run_lock);
00154 MUTEX_UNLOCK(current_agent->run_lock);
00155 continue;
00156 } else {
00157 MUTEX_UNLOCK(current_agent->agent_status_lock);
00158 }
00159 MUTEX_UNLOCK(global->quit_lock);
00160 MUTEX_LOCK(current_agent->agent_status_lock);
00161 switch(current_agent->agent_status)
00162 {
00163 case MC_WAIT_CH :
00164 MUTEX_UNLOCK(current_agent->lock);
00165 MUTEX_UNLOCK(current_agent->agent_status_lock);
00166 agent_RunChScript(current_agent, global);
00167 break;
00168 case MC_AGENT_ACTIVE :
00169 MUTEX_UNLOCK(current_agent->lock);
00170 MUTEX_UNLOCK(current_agent->agent_status_lock);
00171
00172 break;
00173 case MC_WAIT_MESSGSEND :
00174 current_agent->agent_status = MC_WAIT_FINISHED;
00175 COND_BROADCAST(current_agent->agent_status_cond);
00176 MUTEX_UNLOCK(current_agent->agent_status_lock);
00177 MUTEX_UNLOCK(current_agent->lock);
00178 MUTEX_LOCK(ams->runflag_lock);
00179 ams->run = 1;
00180 MUTEX_UNLOCK(ams->runflag_lock);
00181 MUTEX_UNLOCK(current_agent->lock);
00182 message = message_New();
00183 if (
00184 message_InitializeFromAgent
00185 (
00186 ams->mc_platform,
00187 message,
00188 current_agent
00189 )
00190 )
00191 {
00192 fprintf(stderr, "Error initializing message from agent. %s:%d\n", __FILE__, __LINE__);
00193 message_Destroy(message);
00194 message = NULL;
00195 } else {
00196
00197
00198 current_agent->name = (char*)realloc(
00199 current_agent->name,
00200 sizeof(char) * (strlen(current_agent->name) + 10)
00201 );
00202 strcat(current_agent->name, "_SENDING");
00203 message_queue_Add(
00204 ams->mc_platform->message_queue,
00205 message
00206 );
00207 }
00208 break;
00209 case MC_AGENT_NEUTRAL :
00210 MUTEX_UNLOCK(current_agent->agent_status_lock);
00211 MUTEX_UNLOCK(current_agent->lock);
00212 break;
00213 case MC_WAIT_FINISHED :
00214 MUTEX_UNLOCK(current_agent->agent_status_lock);
00215 MUTEX_UNLOCK(current_agent->lock);
00216 agent_queue_RemoveIndex(alist, index);
00217
00218
00219
00220 index--;
00221 break;
00222 default :
00223 printf("ERROR IN AGENT FORMAT");
00224 printf("Agent Format %d not recognized.",
00225 current_agent->agent_status);
00226
00227 current_agent->agent_status = MC_WAIT_FINISHED;
00228 COND_BROADCAST(current_agent->agent_status_cond);
00229 MUTEX_UNLOCK(current_agent->agent_status_lock);
00230 MUTEX_UNLOCK(current_agent->lock);
00231 }
00232 } else {
00233 MUTEX_UNLOCK( alist->lock );
00234 }
00235 MUTEX_LOCK( alist->lock );
00236 }
00237 MUTEX_UNLOCK( alist->lock );
00238 return 0 ;
00239 }
00240
00241 void
00242 ams_Start(mc_platform_p mc_platform)
00243 {
00244 ams_p ams = mc_platform->ams;
00245 #ifndef _WIN32
00246 pthread_attr_t attr;
00247 pthread_attr_init(&attr);
00248 if(mc_platform->stack_size[MC_THREAD_AMS] != -1) {
00249 pthread_attr_setstacksize
00250 (
00251 &attr,
00252 mc_platform->stack_size[MC_THREAD_AMS]
00253 );
00254 }
00255 #else
00256 int stack_size;
00257 if (mc_platform->stack_size[MC_THREAD_AMS] < 1) {
00258
00259 stack_size = mc_platform->stack_size[MC_THREAD_AMS]+1;
00260 } else {
00261 stack_size = mc_platform->stack_size[MC_THREAD_AMS];
00262 }
00263 #endif
00264 THREAD_CREATE
00265 (
00266 &ams->thread,
00267 ams_Thread,
00268 mc_platform
00269 );
00270 }
00271 #ifndef _WIN32
00272 void*
00273 ams_Thread(void* arg)
00274 #else
00275 DWORD WINAPI
00276 ams_Thread( LPVOID arg )
00277 #endif
00278 {
00279 mc_platform_p mc_platform = (mc_platform_p)arg;
00280 ams_p ams = mc_platform->ams;
00281 int ams_thread_count = 0;
00282 while(1) {
00283 MUTEX_LOCK(ams->runflag_lock);
00284 MUTEX_LOCK(mc_platform->quit_lock);
00285 while(ams->run == 0 && !mc_platform->quit) {
00286 MUTEX_UNLOCK(mc_platform->quit_lock);
00287
00288 MUTEX_LOCK(ams->waiting_lock);
00289 ams->waiting = 1;
00290 COND_BROADCAST(ams->waiting_cond);
00291 MUTEX_UNLOCK(ams->waiting_lock);
00292
00293 COND_WAIT
00294 (
00295 ams->runflag_cond,
00296 ams->runflag_lock
00297 );
00298 MUTEX_LOCK(mc_platform->quit_lock);
00299 }
00300
00301 MUTEX_LOCK(ams->waiting_lock);
00302 ams->waiting = 0;
00303 COND_BROADCAST(ams->waiting_cond);
00304 MUTEX_UNLOCK(ams->waiting_lock);
00305 if (ams->run == 0 && mc_platform->quit) {
00306 MUTEX_UNLOCK(mc_platform->quit_lock);
00307 MUTEX_UNLOCK(ams->runflag_lock);
00308 ams_ManageAgentList(ams);
00309 THREAD_EXIT();
00310 }
00311 ams->run = 0;
00312 MUTEX_UNLOCK(mc_platform->quit_lock);
00313 MUTEX_UNLOCK(ams->runflag_lock);
00314 ams_ManageAgentList(ams);
00315 ams_thread_count++;
00316 }
00317 THREAD_EXIT();
00318 }