00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifndef _WIN32
00033 #include <sys/socket.h>
00034 #include <arpa/inet.h>
00035 #include <netinet/in.h>
00036 #include <netdb.h>
00037 #include <sys/un.h>
00038 #include <unistd.h>
00039 #include <sys/time.h>
00040 #include <netdb.h>
00041 #include <pthread.h>
00042 #else
00043 #include <winsock.h>
00044 #include <windows.h>
00045 #include <time.h>
00046 #endif
00047
00048 #include <stdlib.h>
00049 #include "include/acc.h"
00050 #include "include/connection.h"
00051 #include "include/data_structures.h"
00052 #include "include/macros.h"
00053 #include "include/mc_error.h"
00054 #include "include/mc_platform.h"
00055 #include "include/message.h"
00056 #include "include/mtp_http.h"
00057 #include "include/xml_parser.h"
00058 #include "include/fipa_acl_envelope.h"
00059
00060 #define BACKLOG 10
00061
00062 acc_p
00063 acc_Initialize(struct mc_platform_s* mc_platform)
00064 {
00065 acc_p acc;
00066 acc = (acc_p)malloc(sizeof(acc_t));
00067 acc->mc_platform = mc_platform;
00068
00069 acc->waiting = 0;
00070 acc->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00071 MUTEX_INIT(acc->waiting_lock);
00072 acc->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00073 COND_INIT(acc->waiting_cond);
00074
00075 return acc;
00076 }
00077
00078 int
00079 acc_Destroy(acc_p acc)
00080 {
00081 if(acc == NULL) {
00082 return MC_SUCCESS;
00083 }
00084 free(acc);
00085 acc = NULL;
00086 return MC_SUCCESS;
00087 }
00088
00089 #ifndef _WIN32
00090 void*
00091 acc_MessageHandlerThread(void* arg)
00092 #else
00093 DWORD WINAPI
00094 acc_MessageHandlerThread(LPVOID arg)
00095 #endif
00096 {
00097 mc_platform_p mc_platform = (mc_platform_p)arg;
00098 message_p message;
00099 agent_p agent;
00100 int mobile_agent_counter = 1;
00101 char* tmpstr;
00102 char* origname;
00103 int i;
00104
00105 while(1)
00106 {
00107 MUTEX_LOCK(mc_platform->message_queue->lock);
00108 MUTEX_LOCK(mc_platform->quit_lock);
00109 while(mc_platform->message_queue->size == 0 && !mc_platform->quit) {
00110 MUTEX_UNLOCK(mc_platform->quit_lock);
00111 COND_WAIT(
00112 mc_platform->message_queue->cond,
00113 mc_platform->message_queue->lock );
00114 MUTEX_LOCK(mc_platform->quit_lock);
00115 }
00116 if (mc_platform->message_queue->size == 0 && mc_platform->quit)
00117 {
00118 MUTEX_UNLOCK(mc_platform->quit_lock);
00119 MUTEX_UNLOCK(mc_platform->message_queue->lock);
00120 return 0;
00121 }
00122
00123 MUTEX_UNLOCK(mc_platform->quit_lock);
00124 MUTEX_UNLOCK(mc_platform->message_queue->lock);
00125 message = message_queue_Pop(mc_platform->message_queue);
00126 if (message == NULL) {
00127 printf("POP ERROR\n");
00128 continue;
00129 }
00130
00131 MUTEX_LOCK(mc_platform->MC_signal_lock);
00132 mc_platform->MC_signal = MC_RECV_MESSAGE;
00133 COND_BROADCAST(mc_platform->MC_signal_cond);
00134 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00135 MUTEX_LOCK(mc_platform->giant_lock);
00136 while(mc_platform->giant == 0) {
00137 COND_WAIT (
00138 mc_platform->giant_cond,
00139 mc_platform->giant_lock);
00140 }
00141 MUTEX_UNLOCK(mc_platform->giant_lock);
00142
00143
00144 if(message->to_address == NULL) {
00145
00146
00147 switch(message->message_type) {
00148 case MOBILE_AGENT:
00149 agent = agent_Initialize(
00150 mc_platform,
00151 message,
00152 mobile_agent_counter);
00153 if (agent != NULL) {
00154
00155 i = 1;
00156 if(agent_queue_SearchName(mc_platform->agent_queue, agent->name)) {
00157 origname = agent->name;
00158 while(agent_queue_SearchName(mc_platform->agent_queue, agent->name)) {
00159
00160
00161 tmpstr = (char*)malloc(sizeof(char) * strlen(origname) + 7);
00162 sprintf(tmpstr, "%s_%04d", origname, i);
00163 agent->name = tmpstr;
00164 i++;
00165 }
00166 fprintf(stderr, "Warning: Agent '%s' has been renamed to '%s'.\n",
00167 origname, agent->name);
00168 free(origname);
00169 }
00170 mobile_agent_counter++;
00171 agent_queue_Add(
00172 mc_platform->agent_queue,
00173 agent);
00174 }
00175 message_Destroy(message);
00176
00177 MUTEX_LOCK(mc_platform->MC_signal_lock);
00178 mc_platform->MC_signal = MC_RECV_AGENT;
00179 COND_BROADCAST(mc_platform->MC_signal_cond);
00180 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00181 MUTEX_LOCK(mc_platform->giant_lock);
00182 while(mc_platform->giant == 0) {
00183 COND_WAIT(mc_platform->giant_cond,
00184 mc_platform->giant_lock);
00185 }
00186 MUTEX_UNLOCK(mc_platform->giant_lock);
00187
00188 MUTEX_LOCK(mc_platform->ams->runflag_lock);
00189 mc_platform->ams->run = 1;
00190 COND_BROADCAST(mc_platform->ams->runflag_cond);
00191 MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00192 break;
00193 case FIPA_ACL:
00194
00195
00196 break;
00197 case RETURN_MSG:
00198
00199 agent = agent_Initialize(
00200 mc_platform,
00201 message,
00202 mobile_agent_counter);
00203 if (agent != NULL) {
00204 MUTEX_LOCK(agent->lock);
00205 agent->datastate->persistent = 1;
00206 agent->agent_status = MC_AGENT_NEUTRAL;
00207 MUTEX_UNLOCK(agent->lock);
00208 mobile_agent_counter++;
00209 agent_queue_Add(
00210 mc_platform->agent_queue,
00211 agent);
00212 }
00213 message_Destroy(message);
00214
00215 MUTEX_LOCK(mc_platform->MC_signal_lock);
00216 mc_platform->MC_signal = MC_RECV_RETURN;
00217 COND_BROADCAST(mc_platform->MC_signal_cond);
00218 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00219 MUTEX_LOCK(mc_platform->giant_lock);
00220 while(mc_platform->giant == 0) {
00221 COND_WAIT(
00222 mc_platform->giant_cond,
00223 mc_platform->giant_lock);
00224 }
00225 MUTEX_UNLOCK(mc_platform->giant_lock);
00226
00227 MUTEX_LOCK(mc_platform->ams->runflag_lock);
00228 mc_platform->ams->run = 1;
00229 COND_BROADCAST(mc_platform->ams->runflag_cond);
00230 MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00231 break;
00232
00233 #ifdef MC_SECURITY
00234 case ENCRYPTED_DATA:
00235 if (mc_platform->enable_security) {
00236 message_queue_Add
00237 (
00238 mc_platform->asm_message_queue,
00239 message
00240 );
00241 } else {
00242 WARN("mc_security not enabled. Discarding ENCRYPTED_DATA message.");
00243 message_Destroy(message);
00244 }
00245 break;
00246 case ENCRYPTION_INITIALIZE:
00247 if (mc_platform->enable_security) {
00248 asm_queue_Add
00249 (
00250 mc_platform->asm_queue,
00251 asm_node_Initialize(message, mc_platform->security_manager)
00252 );
00253
00254 MUTEX_LOCK(mc_platform->asm_message_queue->lock);
00255 COND_SIGNAL(mc_platform->asm_message_queue->cond);
00256 MUTEX_UNLOCK(mc_platform->asm_message_queue->lock);
00257 message_Destroy(message);
00258 } else {
00259 WARN("mc_security not enabled. Discarding ENCRYPTION_INITIALIZE message.");
00260 message_Destroy(message);
00261 }
00262 break;
00263 case REQUEST_ENCRYPTION_INITIALIZE:
00264 if (mc_platform->enable_security) {
00265 asm_SendEncryptionData(
00266 mc_platform->security_manager,
00267 message->from_address
00268 );
00269 message_Destroy(message);
00270 } else {
00271 WARN("mc_security not enabled. Discarding REQUEST_ENCRYPTION_INITIALIZE message.");
00272 message_Destroy(message);
00273 }
00274 break;
00275 #endif
00276 case RELAY:
00277 case REQUEST:
00278 case SUBSCRIBE:
00279 case CANCEL:
00280 case N_UNDRSTD:
00281 case QUER_IF:
00282 case QUER_REF:
00283 case AGENT_UPDATE:
00284 fprintf(stderr, "FIXME: Message type %d not processable.%s:%d\n",
00285 message->message_type, __FILE__, __LINE__ );
00286 message_Destroy(message);
00287 break;
00288 default:
00289 fprintf(stderr, "Unknown message type:%d %s:%d\n",
00290 message->message_type, __FILE__, __LINE__);
00291 message_Destroy(message);
00292 }
00293 } else {
00294 #ifdef MC_SECURITY
00295 if (mc_platform->enable_security) {
00296 if (
00297 (message->message_type != ENCRYPTED_DATA) &&
00298 (message->message_type != ENCRYPTION_INITIALIZE) &&
00299 (message->message_type != REQUEST_ENCRYPTION_INITIALIZE)
00300 )
00301 {
00302 message_queue_Add
00303 (
00304 mc_platform->asm_message_queue,
00305 message
00306 );
00307 continue;
00308 }
00309 }
00310 #endif
00311 message_Send
00312 (
00313 message
00314 );
00315 message_Destroy
00316 (
00317 message
00318 );
00319 }
00320 }
00321 return 0;
00322 }
00323
00324 #ifndef _WIN32
00325 void*
00326 acc_Thread(void* arg)
00327 #else
00328 DWORD WINAPI
00329 acc_Thread( LPVOID arg )
00330 #endif
00331 {
00332 connection_p connection;
00333 message_p message;
00334 mtp_http_p mtp_http;
00335 mc_platform_p mc_platform = (mc_platform_p)arg;
00336 fipa_acl_envelope_p fipa_envelope;
00337 fipa_acl_message_p fipa_message;
00338 fipa_message_string_p fipa_message_string;
00339 int err;
00340 int i, j;
00341 agent_t* agent;
00342
00343
00344 while(1) {
00345 connection = NULL;
00346 message = NULL;
00347 mtp_http = NULL;
00348 MUTEX_LOCK(mc_platform->connection_queue->lock);
00349 MUTEX_LOCK(mc_platform->quit_lock);
00350 while (mc_platform->connection_queue->size == 0 && !mc_platform->quit) {
00351 MUTEX_UNLOCK(mc_platform->quit_lock);
00352 COND_WAIT(
00353 mc_platform->connection_queue->cond,
00354 mc_platform->connection_queue->lock
00355 );
00356 MUTEX_LOCK(mc_platform->quit_lock);
00357 }
00358 if
00359 (
00360 mc_platform->connection_queue->size == 0 &&
00361 mc_platform->quit
00362 )
00363 {
00364 MUTEX_UNLOCK(mc_platform->quit_lock);
00365 MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00366 return 0;
00367 }
00368 MUTEX_UNLOCK(mc_platform->quit_lock);
00369 MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00370
00371 MUTEX_LOCK(mc_platform->MC_signal_lock);
00372 mc_platform->MC_signal = MC_RECV_CONNECTION;
00373 COND_BROADCAST(mc_platform->MC_signal_cond);
00374 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00375
00376
00377 MUTEX_LOCK(mc_platform->giant_lock);
00378 while (mc_platform->giant == 0) {
00379 COND_WAIT(
00380 mc_platform->giant_cond,
00381 mc_platform->giant_lock
00382 );
00383 }
00384 MUTEX_UNLOCK(mc_platform->giant_lock);
00385
00386
00387 connection = connection_queue_Pop(mc_platform->connection_queue);
00388 mtp_http = mtp_http_New();
00389 if ( mtp_http_InitializeFromConnection(mtp_http, connection ) )
00390 {
00391 connection_Destroy(connection);
00392 mtp_http_Destroy(mtp_http);
00393 continue;
00394 }
00395
00396 switch(mtp_http->http_performative)
00397 {
00398 case HTTP_POST:
00399 case HTTP_PUT:
00400
00401
00402 if(
00403 !strcmp(mtp_http->target, "/ams") ||
00404 !strcmp( strrchr(mtp_http->target, (int)'/'), "/ams" )
00405 ) {
00406 message = message_New();
00407
00408 message->message_body = (char*)malloc
00409 (
00410 sizeof(char) *
00411 (strlen((char*)mtp_http->content->data)+1)
00412 );
00413 strcpy(message->message_body, (char*)mtp_http->content->data);
00414 message->xml_root = mxmlLoadString
00415 (
00416 NULL,
00417 message->message_body,
00418 MXML_NO_CALLBACK
00419 );
00420 if(message_xml_parse(message)) {
00421 fprintf(stderr, "Error parsing message. %s:%d\n",
00422 __FILE__,__LINE__);
00423 message_Destroy(message);
00424 mtp_http_Destroy(mtp_http);
00425 continue;
00426 }
00427 mtp_http_Destroy(mtp_http);
00428 break;
00429 } else if
00430 (
00431 !strcmp(mtp_http->target, "/acc") ||
00432 !strcmp( strrchr(mtp_http->target, (int)'/'), "/acc")
00433 ) {
00434
00435
00436 if (mtp_http->message_parts != 2) {
00437 fprintf(stderr, "Error parsing message. %s:%d\n",
00438 __FILE__,__LINE__);
00439 mtp_http_Destroy(mtp_http);
00440 continue;
00441 }
00442
00443 fipa_envelope = fipa_acl_envelope_New();
00444 err = fipa_envelope_Parse(fipa_envelope, (char*)mtp_http->content[0].data);
00445 if (err) {
00446 fprintf(stderr, "Error parsing message. %s:%d\n",
00447 __FILE__, __LINE__);
00448 fipa_acl_envelope_Destroy(fipa_envelope);
00449 mtp_http_Destroy(mtp_http);
00450 continue;
00451 }
00452
00453
00454 for(i = 0; i < fipa_envelope->num_params; i++) {
00455 for(j = 0; j < fipa_envelope->params[i]->to->num; j++) {
00456 agent = agent_queue_SearchName(
00457 mc_platform->agent_queue,
00458 fipa_envelope->params[i]->to->fipa_agent_identifiers[j]->name
00459 );
00460 if (agent != NULL) {
00461
00462 fipa_message_string = fipa_message_string_New();
00463 fipa_message_string->message = strdup((char*)mtp_http->content[1].data);
00464 fipa_message_string->parse = fipa_message_string->message;
00465 fipa_message = fipa_acl_message_New();
00466 err = fipa_acl_Parse(fipa_message, fipa_message_string);
00467 if (err) {
00468 fipa_message_string_Destroy(fipa_message_string);
00469 fipa_acl_message_Destroy(fipa_message);
00470 fipa_acl_envelope_Destroy(fipa_envelope);
00471 mtp_http_Destroy(mtp_http);
00472 continue;
00473 }
00474 agent_mailbox_Post( agent->mailbox, fipa_message);
00475 fipa_message_string_Destroy(fipa_message_string);
00476 }
00477 }
00478 }
00479 fipa_acl_envelope_Destroy(fipa_envelope);
00480 mtp_http_Destroy(mtp_http);
00481 continue;
00482 }
00483 else {
00484
00485 fprintf(stderr, "Unsupported. %s:%d\n", __FILE__, __LINE__);
00486 mtp_http_Destroy(mtp_http);
00487 }
00488 default:
00489 fprintf(stderr, "unsupported http performative. %s:%d\n",
00490 __FILE__, __LINE__);
00491 }
00492
00493
00494 connection_Destroy(connection);
00495 switch(message->message_type) {
00496 case RELAY:
00497 case REQUEST:
00498 case SUBSCRIBE:
00499 case CANCEL:
00500 case N_UNDRSTD:
00501 case MOBILE_AGENT:
00502 case QUER_IF:
00503 case QUER_REF:
00504 case AGENT_UPDATE:
00505 case RETURN_MSG:
00506 case FIPA_ACL:
00507 message_queue_Add(mc_platform->message_queue, message);
00508 break;
00509 #ifdef MC_SECURITY
00510 case ENCRYPTED_DATA:
00511 case ENCRYPTION_INITIALIZE:
00512 if (mc_platform->enable_security) {
00513 message_queue_Add(mc_platform->asm_message_queue, message);
00514 } else {
00515 WARN("MC Security not enabled. Discarding message...");
00516 message_Destroy(message);
00517 }
00518 break;
00519 case REQUEST_ENCRYPTION_INITIALIZE:
00520 if (mc_platform->enable_security) {
00521 message_queue_Add(mc_platform->message_queue, message);
00522 } else {
00523 WARN("MC Security not enabled. Discarding message...");
00524 message_Destroy(message);
00525 }
00526 break;
00527 #endif
00528 default:
00529 fprintf(stderr, "Unknown message type:%d. Rejecting message.%s:%d\n",
00530 message->message_type,
00531 __FILE__, __LINE__ );
00532 free(message);
00533 break;
00534 }
00535 }
00536 }
00537
00538 void
00539 acc_Start(mc_platform_p mc_platform)
00540 {
00541 acc_p acc = mc_platform->acc;
00542 #ifndef _WIN32
00543 pthread_attr_t attr;
00544 pthread_attr_init(&attr);
00545 if(mc_platform->stack_size[MC_THREAD_ACC] != -1) {
00546 pthread_attr_setstacksize
00547 (
00548 &attr,
00549 mc_platform->stack_size[MC_THREAD_ACC]
00550 );
00551 }
00552 #else
00553 int stack_size;
00554 if (mc_platform->stack_size[MC_THREAD_ACC] < 1) {
00555
00556 stack_size = mc_platform->stack_size[MC_THREAD_ACC]+1;
00557 } else {
00558 stack_size = mc_platform->stack_size[MC_THREAD_ACC];
00559 }
00560 #endif
00561 THREAD_CREATE
00562 (
00563 &acc->thread,
00564 acc_Thread,
00565 mc_platform
00566 );
00567 THREAD_CREATE
00568 (
00569 &acc->message_handler_thread,
00570 acc_MessageHandlerThread,
00571 mc_platform
00572 );
00573 THREAD_CREATE
00574 (
00575 &acc->listen_thread,
00576 listen_Thread,
00577 mc_platform
00578 );
00579 }
00580
00581 #ifndef _WIN32
00582 void*
00583 listen_Thread(void* arg)
00584 #else
00585 DWORD WINAPI
00586 listen_Thread( LPVOID arg )
00587 #endif
00588 {
00589 #ifndef _WIN32
00590 int connectionsockfd;
00591 int sockfd;
00592 struct sockaddr_in sktin;
00593 struct sockaddr_in peer_addr;
00594 #else
00595 SOCKET connectionsockfd;
00596 SOCKET sockfd;
00597 struct sockaddr_in sktin;
00598 struct sockaddr_in peer_addr;
00599 #endif
00600
00601 connection_p connection;
00602 u_long connection_number;
00603 int connectionlen;
00604 mc_platform_p mc_platform = (mc_platform_p)arg;
00605
00606
00607 connection_number = 0;
00608
00609 connectionlen = sizeof(struct sockaddr_in);
00610
00611
00612 sockfd = socket(PF_INET, SOCK_STREAM, 0);
00613 sktin.sin_family = AF_INET;
00614 sktin.sin_port = htons(mc_platform->port);
00615 sktin.sin_addr.s_addr = INADDR_ANY;
00616 memset(sktin.sin_zero, '\0', sizeof sktin.sin_zero);
00617 if (bind(sockfd, (struct sockaddr *)&sktin, sizeof(struct sockaddr))
00618 == -1) {
00619 fprintf(stderr, "bind() error. %s:%d\n",
00620 __FILE__, __LINE__ );
00621 exit(1);
00622 }
00623 listen(sockfd, BACKLOG);
00624
00625
00626 while(1)
00627 {
00628
00629 MUTEX_LOCK(mc_platform->acc->waiting_lock);
00630 mc_platform->acc->waiting = 1;
00631 COND_BROADCAST(mc_platform->acc->waiting_cond);
00632 MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00633 #ifndef _WIN32
00634 if((connectionsockfd = accept(sockfd,
00635 (struct sockaddr *)&peer_addr,
00636 (socklen_t *)&connectionlen)) < 0)
00637 #else
00638 if((connectionsockfd = accept(sockfd,
00639 (struct sockaddr *)&peer_addr,
00640 (int*)&connectionlen)) == INVALID_SOCKET)
00641 #endif
00642 {
00643 fprintf(stderr, "ListenThread: accept error \n");
00644 #ifdef _WIN32
00645 printf("Error number: %d\n", WSAGetLastError() );
00646 continue;
00647 #endif
00648 continue;
00649 }
00650 else
00651 {
00652
00653 MUTEX_LOCK(mc_platform->acc->waiting_lock);
00654 mc_platform->acc->waiting = 0;
00655 COND_BROADCAST(mc_platform->acc->waiting_cond);
00656 MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00657
00658
00659 connection = (connection_p)malloc(sizeof(connection_t));
00660 CHECK_NULL(connection, exit(0););
00661 connection->connect_id = rand();
00662 connection->remote_hostname = NULL;
00663 connection->addr = peer_addr;
00664 connection->serverfd = sockfd;
00665 connection->clientfd = connectionsockfd;
00666
00667
00668 connection_queue_Add(mc_platform->connection_queue, connection);
00669 }
00670 }
00671
00672
00673 #ifndef _WIN32
00674 pthread_exit(0);
00675 #else
00676 ExitThread(0);
00677 #endif
00678
00679 return 0;
00680 }