00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #ifndef _WIN32
00036 #include <sys/socket.h>
00037 #include <arpa/inet.h>
00038 #include <netinet/in.h>
00039 #include <netdb.h>
00040 #include <sys/un.h>
00041 #include <unistd.h>
00042 #include <sys/time.h>
00043 #include <netdb.h>
00044 #include <pthread.h>
00045 #else
00046 #include <winsock.h>
00047 #include <windows.h>
00048 #include <time.h>
00049 #endif
00050 
00051 #include <stdlib.h>
00052 #include "include/acc.h"
00053 #include "include/connection.h"
00054 #include "include/data_structures.h"
00055 #include "include/macros.h"
00056 #include "include/mc_error.h"
00057 #include "include/mc_platform.h"
00058 #include "include/message.h"
00059 #include "include/mtp_http.h"
00060 #include "include/xml_parser.h"
00061 #include "include/fipa_acl_envelope.h"
00062 
00063 #define BACKLOG 10
00064 
00065 acc_p
00066 acc_Initialize(struct mc_platform_s* mc_platform)
00067 {
00068   acc_p acc;
00069   acc = (acc_p)malloc(sizeof(acc_t));
00070   acc->mc_platform = mc_platform;
00071 
00072   acc->waiting = 0;
00073   acc->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00074   MUTEX_INIT(acc->waiting_lock);
00075   acc->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00076   COND_INIT(acc->waiting_cond);
00077 
00078   return acc;
00079 }
00080 
00081   int 
00082 acc_Destroy(acc_p acc)
00083 {
00084   if(acc == NULL) {
00085     return MC_SUCCESS;
00086   }
00087   free(acc);
00088   acc = NULL;
00089   return MC_SUCCESS;
00090 }
00091 
00092 #ifndef _WIN32
00093   void*
00094 acc_MessageHandlerThread(void* arg)
00095 #else
00096   DWORD WINAPI
00097 acc_MessageHandlerThread(LPVOID arg)
00098 #endif
00099 {
00100   mc_platform_p mc_platform = (mc_platform_p)arg;
00101   message_p message;
00102   agent_p agent;
00103   int mobile_agent_counter = 1;
00104   char* tmpstr;
00105   char* origname;
00106   int i;
00107 
00108   while(1) 
00109   {
00110     MUTEX_LOCK(mc_platform->message_queue->lock);
00111     MUTEX_LOCK(mc_platform->quit_lock);
00112     while(mc_platform->message_queue->size == 0 && !mc_platform->quit) {
00113       MUTEX_UNLOCK(mc_platform->quit_lock);
00114       COND_WAIT(
00115          mc_platform->message_queue->cond,
00116          mc_platform->message_queue->lock );
00117       MUTEX_LOCK(mc_platform->quit_lock);
00118     }
00119     if (mc_platform->message_queue->size == 0 && mc_platform->quit)
00120     {
00121       MUTEX_UNLOCK(mc_platform->quit_lock);
00122       MUTEX_UNLOCK(mc_platform->message_queue->lock);
00123       return 0;
00124     }
00125 
00126     MUTEX_UNLOCK(mc_platform->quit_lock);
00127     MUTEX_UNLOCK(mc_platform->message_queue->lock);
00128     message = message_queue_Pop(mc_platform->message_queue);
00129     if (message == NULL) {
00130       printf("POP ERROR\n");
00131       continue;
00132     }
00133     
00134     MUTEX_LOCK(mc_platform->MC_signal_lock);
00135     mc_platform->MC_signal = MC_RECV_MESSAGE;
00136     COND_BROADCAST(mc_platform->MC_signal_cond);
00137     MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00138     MUTEX_LOCK(mc_platform->giant_lock);
00139     while(mc_platform->giant == 0) {
00140       COND_WAIT (
00141           mc_platform->giant_cond,
00142           mc_platform->giant_lock);
00143     }
00144     MUTEX_UNLOCK(mc_platform->giant_lock);
00145     
00146 
00147     if(message->to_address == NULL) {
00148       
00149 
00150       switch(message->message_type) {
00151         case MOBILE_AGENT:
00152           agent = agent_Initialize(
00153               mc_platform,
00154               message,
00155               mobile_agent_counter);
00156           if (agent != NULL) {
00157             
00158             i = 1;
00159             if(agent_queue_SearchName(mc_platform->agent_queue, agent->name)) {
00160               origname = agent->name;
00161               while(agent_queue_SearchName(mc_platform->agent_queue, agent->name)) {
00162                 
00163 
00164                 tmpstr = (char*)malloc(sizeof(char) * strlen(origname) + 7);
00165                 sprintf(tmpstr, "%s_%04d", origname, i);
00166                 agent->name = tmpstr;
00167                 i++;
00168               }
00169               fprintf(stderr, "Warning: Agent '%s' has been renamed to '%s'.\n",
00170                   origname, agent->name);
00171               free(origname);
00172             }
00173             mobile_agent_counter++;
00174             agent_queue_Add(
00175                 mc_platform->agent_queue,
00176                 agent);
00177           }
00178           message_Destroy(message);
00179           
00180           MUTEX_LOCK(mc_platform->MC_signal_lock);
00181           mc_platform->MC_signal = MC_RECV_AGENT;
00182           COND_BROADCAST(mc_platform->MC_signal_cond);
00183           MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00184           MUTEX_LOCK(mc_platform->giant_lock);
00185           while(mc_platform->giant == 0) {
00186             COND_WAIT(mc_platform->giant_cond,
00187                 mc_platform->giant_lock);
00188           }
00189           MUTEX_UNLOCK(mc_platform->giant_lock);
00190           
00191           MUTEX_LOCK(mc_platform->ams->runflag_lock);
00192           mc_platform->ams->run = 1;
00193           COND_BROADCAST(mc_platform->ams->runflag_cond);
00194           MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00195           break;
00196         case FIPA_ACL:
00197           
00198 
00199           break;
00200         case RETURN_MSG:
00201           
00202           agent = agent_Initialize(
00203               mc_platform,
00204               message,
00205               mobile_agent_counter);
00206           if (agent != NULL) {
00207             MUTEX_LOCK(agent->lock);
00208             agent->datastate->persistent = 1;
00209             agent->agent_status = MC_AGENT_NEUTRAL;
00210             MUTEX_UNLOCK(agent->lock);
00211             mobile_agent_counter++;
00212             agent_queue_Add(
00213                 mc_platform->agent_queue,
00214                 agent);
00215           }
00216           message_Destroy(message);
00217           
00218           MUTEX_LOCK(mc_platform->MC_signal_lock);
00219           mc_platform->MC_signal = MC_RECV_RETURN;
00220           COND_BROADCAST(mc_platform->MC_signal_cond);
00221           MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00222           MUTEX_LOCK(mc_platform->giant_lock);
00223           while(mc_platform->giant == 0) {
00224             COND_WAIT(
00225                 mc_platform->giant_cond,
00226                 mc_platform->giant_lock);
00227           }
00228           MUTEX_UNLOCK(mc_platform->giant_lock);
00229           
00230           MUTEX_LOCK(mc_platform->ams->runflag_lock);
00231           mc_platform->ams->run = 1;
00232           COND_BROADCAST(mc_platform->ams->runflag_cond);
00233           MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00234           break;
00235           
00236 #ifdef MC_SECURITY
00237         case ENCRYPTED_DATA:
00238           if (mc_platform->enable_security) {
00239             message_queue_Add
00240               (
00241                mc_platform->asm_message_queue,
00242                message
00243               );
00244           } else {
00245             WARN("mc_security not enabled. Discarding ENCRYPTED_DATA message.");
00246             message_Destroy(message);
00247           }
00248           break;
00249         case ENCRYPTION_INITIALIZE: 
00250           if (mc_platform->enable_security) {
00251             asm_queue_Add
00252               (
00253                mc_platform->asm_queue,
00254                asm_node_Initialize(message, mc_platform->security_manager)
00255               );
00256             
00257             MUTEX_LOCK(mc_platform->asm_message_queue->lock);
00258             COND_SIGNAL(mc_platform->asm_message_queue->cond);
00259             MUTEX_UNLOCK(mc_platform->asm_message_queue->lock);
00260             message_Destroy(message);
00261           } else {
00262             WARN("mc_security not enabled. Discarding ENCRYPTION_INITIALIZE message.");
00263             message_Destroy(message);
00264           }
00265           break;
00266         case REQUEST_ENCRYPTION_INITIALIZE:
00267           if (mc_platform->enable_security) {
00268             asm_SendEncryptionData(
00269                 mc_platform->security_manager,
00270                 message->from_address
00271                 );
00272             message_Destroy(message);
00273           } else {
00274             WARN("mc_security not enabled. Discarding REQUEST_ENCRYPTION_INITIALIZE message.");
00275             message_Destroy(message);
00276           }
00277           break;
00278 #endif 
00279         case RELAY:
00280         case REQUEST:
00281         case SUBSCRIBE:
00282         case CANCEL:
00283         case N_UNDRSTD:
00284         case QUER_IF:
00285         case QUER_REF:
00286         case AGENT_UPDATE:
00287           fprintf(stderr, "FIXME: Message type %d not processable.%s:%d\n",
00288               message->message_type, __FILE__, __LINE__ );
00289           message_Destroy(message);
00290           break;
00291         default:
00292           fprintf(stderr, "Unknown message type:%d %s:%d\n",
00293               message->message_type, __FILE__, __LINE__);
00294           message_Destroy(message);
00295       }
00296     } else {
00297 #ifdef MC_SECURITY
00298       if (mc_platform->enable_security) {
00299         if (
00300             (message->message_type != ENCRYPTED_DATA) &&
00301             (message->message_type != ENCRYPTION_INITIALIZE) &&
00302             (message->message_type != REQUEST_ENCRYPTION_INITIALIZE)
00303            )
00304         {
00305           message_queue_Add
00306             (
00307              mc_platform->asm_message_queue,
00308              message
00309             );
00310           continue;
00311         }
00312       }
00313 #endif
00314       message_Send
00315         (
00316          message
00317         );
00318       message_Destroy
00319         (
00320          message
00321         );
00322     }
00323   }
00324   return 0;
00325 }
00326 
00327 #ifndef _WIN32
00328   void*
00329 acc_Thread(void* arg)
00330 #else
00331   DWORD WINAPI
00332 acc_Thread( LPVOID arg )
00333 #endif
00334 {
00335   connection_p connection;
00336   message_p message;
00337   mtp_http_p mtp_http;
00338   mc_platform_p mc_platform = (mc_platform_p)arg;
00339   fipa_acl_envelope_p fipa_envelope;
00340   fipa_acl_message_p fipa_message;
00341   fipa_message_string_p fipa_message_string;
00342   int err;
00343   int i, j;
00344   agent_t* agent;
00345   
00346 
00347   while(1) {
00348     connection = NULL;
00349     message = NULL;
00350     mtp_http = NULL;
00351     MUTEX_LOCK(mc_platform->connection_queue->lock);
00352     MUTEX_LOCK(mc_platform->quit_lock);
00353     while (mc_platform->connection_queue->size == 0 && !mc_platform->quit) {
00354       MUTEX_UNLOCK(mc_platform->quit_lock);
00355       COND_WAIT(
00356           mc_platform->connection_queue->cond,
00357           mc_platform->connection_queue->lock
00358           );
00359       MUTEX_LOCK(mc_platform->quit_lock);
00360     }
00361     if 
00362       (
00363        mc_platform->connection_queue->size == 0 &&
00364        mc_platform->quit
00365       )
00366       {
00367         MUTEX_UNLOCK(mc_platform->quit_lock);
00368         MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00369         return 0;
00370       }
00371     MUTEX_UNLOCK(mc_platform->quit_lock);
00372     MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00373     
00374     MUTEX_LOCK(mc_platform->MC_signal_lock);
00375     mc_platform->MC_signal = MC_RECV_CONNECTION;
00376     COND_BROADCAST(mc_platform->MC_signal_cond);
00377     MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00378 
00379     
00380     MUTEX_LOCK(mc_platform->giant_lock);
00381     while (mc_platform->giant == 0) {
00382       COND_WAIT(
00383           mc_platform->giant_cond,
00384           mc_platform->giant_lock
00385           );
00386     }
00387     MUTEX_UNLOCK(mc_platform->giant_lock);
00388 
00389     
00390     connection = connection_queue_Pop(mc_platform->connection_queue);
00391     mtp_http = mtp_http_New();
00392     if ( mtp_http_InitializeFromConnection(mtp_http, connection ) )
00393     {
00394       connection_Destroy(connection);
00395       mtp_http_Destroy(mtp_http);
00396       continue;
00397     }
00398 
00399     switch(mtp_http->http_performative)
00400     {
00401       case HTTP_POST:
00402       case HTTP_PUT:
00403         
00404         
00405         if(
00406             !strcmp(mtp_http->target, "/ams") ||
00407             !strcmp( strrchr(mtp_http->target, (int)'/'), "/ams" )
00408             ) {
00409           message = message_New();
00410           
00411           message->message_body = (char*)malloc
00412             (
00413              sizeof(char) * 
00414              (strlen((char*)mtp_http->content->data)+1)
00415             );
00416           strcpy(message->message_body, (char*)mtp_http->content->data);
00417           message->xml_root = mxmlLoadString
00418             (
00419              NULL,
00420              message->message_body,
00421              MXML_NO_CALLBACK
00422             );
00423           if(message_xml_parse(message)) {
00424             fprintf(stderr, "Error parsing message. %s:%d\n",
00425                 __FILE__,__LINE__);
00426             message_Destroy(message);
00427             mtp_http_Destroy(mtp_http);
00428             continue;
00429           }
00430           mtp_http_Destroy(mtp_http);
00431           break;
00432         } else if 
00433           ( 
00434            !strcmp(mtp_http->target, "/acc") ||
00435            !strcmp( strrchr(mtp_http->target, (int)'/'), "/acc")
00436           ) {
00437           
00438 
00439           if (mtp_http->message_parts != 2) {
00440             fprintf(stderr, "Error parsing message. %s:%d\n",
00441                 __FILE__,__LINE__);
00442             mtp_http_Destroy(mtp_http);
00443             continue;
00444           }
00445           
00446           fipa_envelope = fipa_acl_envelope_New();
00447           err = fipa_envelope_Parse(fipa_envelope, (char*)mtp_http->content[0].data);
00448           if (err) {
00449             fprintf(stderr, "Error parsing message. %s:%d\n",
00450                 __FILE__, __LINE__);
00451             fipa_acl_envelope_Destroy(fipa_envelope);
00452             mtp_http_Destroy(mtp_http);
00453             continue;
00454           }
00455           
00456 
00457           for(i = 0; i < fipa_envelope->num_params; i++) {
00458             for(j = 0; j < fipa_envelope->params[i]->to->num; j++) {
00459               agent = agent_queue_SearchName(
00460                   mc_platform->agent_queue,
00461                   fipa_envelope->params[i]->to->fipa_agent_identifiers[j]->name
00462                   );
00463               if (agent != NULL) {
00464                 
00465                 fipa_message_string = fipa_message_string_New();
00466                 fipa_message_string->message = strdup((char*)mtp_http->content[1].data);
00467                 fipa_message_string->parse = fipa_message_string->message;
00468                 fipa_message = fipa_acl_message_New();
00469                 err = fipa_acl_Parse(fipa_message, fipa_message_string);
00470                 if (err) {
00471                   fipa_message_string_Destroy(fipa_message_string);
00472                   fipa_acl_message_Destroy(fipa_message);
00473                   fipa_acl_envelope_Destroy(fipa_envelope);
00474                   mtp_http_Destroy(mtp_http);
00475                   continue;
00476                 }
00477                 agent_mailbox_Post( agent->mailbox, fipa_message);
00478                 fipa_message_string_Destroy(fipa_message_string);
00479               }
00480             }
00481           }
00482           fipa_acl_envelope_Destroy(fipa_envelope);
00483           mtp_http_Destroy(mtp_http);
00484           continue;
00485         }
00486         else {
00487           
00488           fprintf(stderr, "Unsupported. %s:%d\n", __FILE__, __LINE__);
00489           mtp_http_Destroy(mtp_http);
00490         }
00491       default:
00492         fprintf(stderr, "unsupported http performative. %s:%d\n",
00493             __FILE__, __LINE__);
00494     }
00495 
00496     
00497     connection_Destroy(connection);
00498     switch(message->message_type) {
00499       case RELAY:
00500       case REQUEST:
00501       case SUBSCRIBE:
00502       case CANCEL:
00503       case N_UNDRSTD:
00504       case MOBILE_AGENT:
00505       case QUER_IF:
00506       case QUER_REF:
00507       case AGENT_UPDATE:
00508       case RETURN_MSG:
00509       case FIPA_ACL:
00510         message_queue_Add(mc_platform->message_queue, message);
00511         break;
00512 #ifdef MC_SECURITY
00513       case ENCRYPTED_DATA:
00514       case ENCRYPTION_INITIALIZE:
00515         if (mc_platform->enable_security) {
00516           message_queue_Add(mc_platform->asm_message_queue, message);
00517         } else {
00518           WARN("MC Security not enabled. Discarding message...");
00519           message_Destroy(message);
00520         }
00521         break;
00522       case REQUEST_ENCRYPTION_INITIALIZE:
00523         if (mc_platform->enable_security) {
00524           message_queue_Add(mc_platform->message_queue, message);
00525         } else {
00526           WARN("MC Security not enabled. Discarding message...");
00527           message_Destroy(message);
00528         }
00529         break;
00530 #endif
00531       default:
00532         fprintf(stderr, "Unknown message type:%d. Rejecting message.%s:%d\n",
00533             message->message_type,
00534             __FILE__, __LINE__ );
00535         free(message);
00536         break;
00537     }
00538   }
00539 }
00540 
00541   void
00542 acc_Start(mc_platform_p mc_platform)
00543 {
00544   acc_p acc = mc_platform->acc;
00545 #ifndef _WIN32
00546   pthread_attr_t attr;
00547   pthread_attr_init(&attr);
00548   if(mc_platform->stack_size[MC_THREAD_ACC] != -1) {
00549   pthread_attr_setstacksize
00550     (
00551      &attr,
00552      mc_platform->stack_size[MC_THREAD_ACC]
00553     );
00554   }
00555 #else
00556   int stack_size;
00557   if (mc_platform->stack_size[MC_THREAD_ACC] < 1) {
00558     
00559     stack_size = mc_platform->stack_size[MC_THREAD_ACC]+1; 
00560   } else {
00561     stack_size = mc_platform->stack_size[MC_THREAD_ACC];
00562   }
00563 #endif
00564   THREAD_CREATE
00565     ( 
00566      &acc->thread,
00567      acc_Thread,
00568      mc_platform 
00569     );
00570   THREAD_CREATE
00571     ( 
00572      &acc->message_handler_thread,
00573      acc_MessageHandlerThread,
00574      mc_platform
00575     );
00576   THREAD_CREATE
00577     (
00578      &acc->listen_thread,
00579      listen_Thread,
00580      mc_platform 
00581     );
00582 }
00583 
00584 #ifndef _WIN32
00585   void*
00586 listen_Thread(void* arg)
00587 #else
00588   DWORD WINAPI
00589 listen_Thread( LPVOID arg )
00590 #endif
00591 {
00592 #ifndef _WIN32
00593   int connectionsockfd; 
00594   int sockfd;
00595   struct sockaddr_in sktin;
00596   struct sockaddr_in peer_addr;
00597 #else
00598   SOCKET connectionsockfd;
00599   SOCKET sockfd;
00600   struct sockaddr_in sktin;
00601   struct sockaddr_in peer_addr;
00602 #endif
00603 
00604   connection_p connection;
00605   u_long connection_number;
00606   int connectionlen;
00607   mc_platform_p mc_platform = (mc_platform_p)arg;
00608 
00609   
00610   connection_number = 0;
00611 
00612   connectionlen = sizeof(struct sockaddr_in);
00613 
00614   
00615   sockfd = socket(PF_INET, SOCK_STREAM, 0);
00616   sktin.sin_family = AF_INET;
00617   sktin.sin_port = htons(mc_platform->port);
00618   sktin.sin_addr.s_addr = INADDR_ANY;
00619   memset(sktin.sin_zero, '\0', sizeof sktin.sin_zero);
00620   if (bind(sockfd, (struct sockaddr *)&sktin, sizeof(struct sockaddr))
00621       == -1) {
00622     fprintf(stderr, "bind() error. %s:%d\n",
00623         __FILE__, __LINE__ );
00624     exit(1);
00625   }
00626   listen(sockfd, BACKLOG);
00627 
00628   
00629   while(1)
00630   {
00631     
00632     MUTEX_LOCK(mc_platform->acc->waiting_lock);
00633     mc_platform->acc->waiting = 1;
00634     COND_BROADCAST(mc_platform->acc->waiting_cond);
00635     MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00636 #ifndef _WIN32
00637     if((connectionsockfd = accept(sockfd, 
00638             (struct sockaddr *)&peer_addr, 
00639             (socklen_t *)&connectionlen)) < 0)
00640 #else
00641       if((connectionsockfd = accept(sockfd,
00642               (struct sockaddr *)&peer_addr,
00643               (int*)&connectionlen)) == INVALID_SOCKET)
00644 #endif
00645       {
00646         fprintf(stderr, "ListenThread: accept error \n");
00647 #ifdef _WIN32
00648         printf("Error number: %d\n", WSAGetLastError() );
00649         continue;
00650 #endif
00651         continue;
00652       }
00653       else 
00654       {
00655         
00656         MUTEX_LOCK(mc_platform->acc->waiting_lock);
00657         mc_platform->acc->waiting = 0;
00658         COND_BROADCAST(mc_platform->acc->waiting_cond);
00659         MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00660         
00661 
00662         connection = (connection_p)malloc(sizeof(connection_t));
00663         CHECK_NULL(connection, exit(0););
00664         connection->connect_id = rand();
00665         connection->remote_hostname = NULL;
00666         connection->addr = peer_addr;
00667         connection->serverfd = sockfd;
00668         connection->clientfd = connectionsockfd;
00669 
00670         
00671         connection_queue_Add(mc_platform->connection_queue, connection);
00672       }
00673   }
00674 
00675   
00676 #ifndef _WIN32
00677   pthread_exit(0);
00678 #else
00679   ExitThread(0);
00680 #endif
00681 
00682   return 0;
00683 }