Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

priority_queue.c

Go to the documentation of this file.
00001 /*
00002 ** priority_queue.c
00003 **
00004 ** This implements a priority transfer queue.
00005 **
00006 ** Priority levels are defined in the .h file.
00007 **
00008 ** The actual sorting is of the pointers in array queue->sorted.
00009 ** Bear in mind these are pointers to the PRI_QUEUE elements in
00010 ** queue->objects.
00011 ** 
00012 ** The queue must first be initialized to a specified size
00013 ** using the init_pri_queue function.  After that, the queue
00014 ** is ready for use through add_item() and pop_next_item().
00015 ** (pop_next_item always returns the next item of the highest
00016 ** priority).
00017 **
00018 ** SEE priority_queue.h for FUNCTION DESCRIPTIONS and RETURN CODES.
00019 **
00020 ** CAUTION: This queue manages its own mutex, and thus the function
00021 **          calls do not need to be wrapped in such by the caller.
00022 */
00023 #include <stdio.h>
00024 #include <stdlib.h>
00025 #include <platform.h>
00026 #include <earthworm.h>
00027 #include <earthworm_complex_funcs.h>
00028 #include <priority_queue.h>
00029 
00030 /* #define LOG_DEBUG 1  */
00031 
00032 #ifdef LOG_DEBUG
00033 /* #define DEBUG_DETAILS 1 */
00034 #include <stdio.h>
00035 #endif
00036 
00037 /* Private definitions used to simplify the code by keeping the array
00038 ** shifting in a single location.
00039 */
00040 #define EW_PRI_SHIFT_DOWN -1
00041 #define EW_PRI_SHIFT_NONE  0
00042 #define EW_PRI_SHIFT_UP    1
00043 
00044 
00045 /*********************************************************
00046 **
00047 **********************************************************/
00048 int init_pri_queue( PRI_QUEUE   * p_queue
00049                   , unsigned long p_max_items
00050                   , unsigned long p_max_item_size )
00051 {
00052    int r_status = EW_PRI_RETNORMAL;
00053 
00054    if ( p_queue == NULL )
00055    {
00056       r_status = EW_PRI_RETQNULL;   
00057    }
00058    else
00059    {
00060       /*
00061       ** Initialize structure in case deallocation needed later
00062       */
00063       p_queue->queuesize = 0;
00064       p_queue->itemsused = 0;
00065       p_queue->sorted    = NULL;
00066       p_queue->entries   = NULL;
00067       p_queue->data      = NULL;
00068    }
00069 
00070    if ( p_max_items < 1 )
00071    {
00072       r_status = EW_PRI_RETPARAM;
00073    }
00074 
00075    if ( r_status == 0 )
00076    {
00077       unsigned long _idx;
00078 
00079       p_queue->queuesize = p_max_items;
00080       p_queue->itemmaxsize = p_max_item_size;
00081 
00082       /*
00083       ** Allocate space for the sorted pointers to the object storage
00084       */
00085       if ( r_status == EW_PRI_RETNORMAL )
00086       {
00087          if ( ( p_queue->sorted = (PRI_QUEUE_ENTRY **) malloc( sizeof(PRI_QUEUE_ENTRY*) * p_max_items) ) ==  NULL )
00088          {
00089             r_status = EW_PRI_RETMALLOC;
00090          }
00091       }
00092 
00093       /*
00094       ** Allocate space for the object storage
00095       */
00096       if ( r_status == EW_PRI_RETNORMAL )
00097       {
00098          if ( ( p_queue->entries = (PRI_QUEUE_ENTRY *) malloc( sizeof(PRI_QUEUE_ENTRY) * p_max_items) ) ==  NULL )
00099          {
00100             r_status = EW_PRI_RETMALLOC;
00101          }
00102       }
00103 
00104 
00105       /*
00106       ** Allocate space for the object storage
00107       */
00108       if ( r_status == EW_PRI_RETNORMAL )
00109       {
00110          if ( ( p_queue->data = (char *) malloc(sizeof(char) * p_max_item_size * p_max_items) ) ==  NULL )
00111          {
00112             r_status = EW_PRI_RETMALLOC;
00113          }
00114       }
00115       
00116 
00117       /*
00118       ** Initialize the priority containers,
00119       ** Load the container pointers into the sorted list
00120       */
00121       if ( r_status == EW_PRI_RETNORMAL )
00122       {
00123          PRI_QUEUE_ENTRY  * _entry  = p_queue->entries;
00124          PRI_QUEUE_ENTRY ** _sorted = p_queue->sorted;
00125          for ( _idx = 0 ; _idx < p_max_items ; _idx++, _entry++, _sorted++ ) 
00126          {
00127             /* set priority to indicate no valid data */
00128             _entry->pri = EW_PRIORITY_NONE;
00129             _entry->length = 0;
00130             /* assign data storage location to queue entry */
00131             _entry->data = (char *)(p_queue->data + (_idx * p_max_item_size));
00132             /* grab storage pointer for sorted array */
00133             *_sorted = _entry;
00134          }
00135       }
00136 
00137 
00138       /*
00139       ** Initialize priority index locations to point to the first (zero)
00140       ** location in the sorted array
00141       */
00142       if ( r_status == EW_PRI_RETNORMAL )
00143       {
00144          for ( _idx = 0 ; _idx < EW_PRIORITY_COUNT ; _idx++ ) {
00145             p_queue->insert_indices[_idx] = 0;
00146          }
00147       }
00148       CreateSpecificMutex( &(p_queue->lock) );
00149    }
00150 
00151 
00152    if ( r_status != EW_PRI_RETNORMAL )
00153    {
00154       release_pri_queue( p_queue );
00155    }
00156 #ifdef LOG_DEBUG
00157    else
00158    {
00159       char dbgstr[120];
00160             sprintf( dbgstr
00161                    , "DEBUG init: %d  %d  %d\n"
00162                    , p_queue->sorted, p_queue->entries, p_queue->data
00163                    );
00164             logit( "t" 
00165                  , dbgstr
00166                  , "export_pri"
00167                  , "MOD_EXPORT_SCN"
00168                  );
00169    }
00170 #endif
00171    return r_status;
00172 }
00173 
00174 
00175 /*********************************************************
00176 **
00177 **********************************************************/
00178 void release_pri_queue( PRI_QUEUE * p_queue )
00179 {
00180    p_queue->queuesize = 0;
00181    p_queue->itemsused = 0;
00182 
00183    CloseSpecificMutex( &p_queue->lock );
00184 
00185    if ( p_queue->sorted != NULL )
00186    {
00187       free( p_queue->sorted );
00188       p_queue->sorted = NULL;
00189    }
00190 
00191    if ( p_queue->entries != NULL )
00192    {
00193       free( p_queue->entries );
00194       p_queue->entries = NULL;
00195    }
00196 
00197    if ( p_queue->data != NULL )
00198    {
00199       free( p_queue->data );
00200       p_queue->data = NULL;
00201    }
00202 }
00203 
00204 /*********************************************************
00205 **
00206 **********************************************************/
00207 int getNumOfElementsInQueue( PRI_QUEUE * p_queue )
00208 {
00209    if ( p_queue == NULL )
00210    {
00211       return 0;
00212    }
00213    return p_queue->itemsused;
00214 }
00215 
00216 /*********************************************************
00217 **
00218 **********************************************************/
00219 int add_item( PRI_QUEUE * p_queue
00220             , EW_PRIORITY p_priority
00221             , MSG_LOGO    p_logo
00222             , long        p_size
00223             , PRI_DATA    p_data
00224             )
00225 {
00226 #ifdef LOG_DEBUG
00227    char dbgstr[120];
00228 #endif
00229 
00230    int r_status = EW_PRI_RETNORMAL;
00231 
00232    if ( p_queue == NULL )
00233    {
00234       return( EW_PRI_RETQNULL );
00235    }
00236 
00237    if ( p_queue->queuesize < 1 )
00238    {
00239       return( EW_PRI_RETNOTREADY );
00240    }
00241 
00242    RequestSpecificMutex( &p_queue->lock );
00243 
00244    if ( p_size < 0 || p_queue->itemmaxsize < p_size )
00245    {
00246       return ( EW_PRI_RETMSGSIZE );
00247    }
00248    else
00249    {
00250       EW_PRIORITY _usePri = p_priority;
00251 
00252       long _queuesize = p_queue->queuesize
00253          , _ins_index = p_queue->queuesize - 1 /* array location to insert new object */
00254          , _src_index   /* array location to obtain container for insertion */
00255          ;
00256       PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
00257       int _shift_direction = EW_PRI_SHIFT_NONE
00258          , _shift_stt
00259          , _shift_end
00260          , _idx             /* work index for loops */
00261          ;
00262       int _doSwap  = 0  /* swap old object for new */
00263       , _updateIdx = 0  /* update the priority insert location indices */
00264       ;
00265 
00266       if ( p_priority < EW_PRIORITY_MIN || EW_PRIORITY_MAX < p_priority )
00267       {
00268          /* priority out of range, fall back to default */
00269          _usePri = EW_PRIORITY_DEF;
00270          /* keep this return status unless worse one arises */
00271          r_status = EW_PRI_RETBADPRI;
00272       }
00273 
00274 
00275       if ( p_queue->insert_indices[p_priority] < _queuesize )
00276       {
00277          /*
00278          ** The insert location is within the array
00279          */
00280          _ins_index = p_queue->insert_indices[p_priority];
00281 
00282 
00283          /* Check state of item at insert location */
00284          _wrk_pointer = p_queue->sorted[_ins_index];
00285 
00286          if (   _wrk_pointer->pri == 0           /* container at insert location unused */
00287              || _ins_index == (_queuesize - 1) /* insert location is last position in the array */
00288             )
00289          {
00290             /*
00291             ** Use the item at the insert location as the source container
00292             **
00293             ** _shift_direction = 0;
00294             */
00295             _doSwap = 1;
00296             _updateIdx = 1;
00297             if ( p_queue->itemsused < _queuesize )
00298             {
00299                p_queue->itemsused++;  /* increment number of items used */
00300             }
00301          }
00302          else
00303          {  /*
00304             ** the insert location is within the array space
00305             ** the item at the insert location is used
00306             ** the insert location is not the last position in the array
00307             */
00308             
00309             /* check if the array is full */
00310             if ( p_queue->itemsused < _queuesize )
00311             {
00312                /*
00313                ** The array is not full, therefore there is at least one
00314                ** unused item in the array.
00315                **
00316                ** Shift items down from insert point to first unused item
00317                */
00318                _src_index = p_queue->itemsused++;  /* increment number of items used */
00319 
00320                /* GET THE SOURCE CONTAINER */
00321                _wrk_pointer = p_queue->sorted[_src_index];
00322 
00323                /* SHIFT ITEMS DOWN */
00324                _shift_end = _ins_index;
00325                _shift_stt = _src_index;
00326                _shift_direction = EW_PRI_SHIFT_DOWN;
00327 
00328                _doSwap = 1;
00329                _updateIdx = 1;
00330             }
00331             else
00332             {
00333                /*
00334                ** The array is full.
00335                **
00336                ** Since we've already ascertained that the insert point is within the array
00337                ** we know that the item to be inserted is less than the highest priority
00338                ** stored in the array because there is at least one item of a higher priority
00339                ** following the items at the priority of the new item.
00340                **
00341                ** Get the priority of the last item in the array, use the earliest item
00342                ** of that priority to make space for the new item.
00343                ** This equates to shifting items from the insert location down to the
00344                ** insert location of the priority before the last (the first item of
00345                ** the last priority in the array).
00346                */
00347                _src_index = p_queue->insert_indices[ p_queue->sorted[_queuesize - 1]->pri - 1 ];
00348 
00349                /* GET THE SOURCE CONTAINER */
00350                _wrk_pointer = p_queue->sorted[_src_index];
00351 
00352                /* SHIFT ITEMS DOWN */
00353                _shift_end = _ins_index;
00354                _shift_stt = _src_index;
00355                _shift_direction = EW_PRI_SHIFT_DOWN;
00356 
00357                _doSwap = 1;
00358                _updateIdx = 1;
00359 
00360                r_status = EW_PRI_RETDROP;
00361             }
00362          }
00363       }  /*  insert_indices[p_priority] < _queuesize  */
00364       else
00365       {  /*  insert_indices[p_priority] == _queuesize
00366          **
00367          ** Insert index for the priority of the new item is past the end of
00368          ** the array.  (The array is full.)
00369          **
00370          ** Therefore, this item can only be inserted if this item has the
00371          ** same priority as the last item in the array and the priority
00372          ** of the last item in the array is not of the minimum priority.
00373          **
00374          ** In such a case, will drop the earliest item of the same priority,
00375          ** shift all other of the priority up, and tack the new item on at the end.
00376          **
00377          ** (That way, if the bandwidth opens up again and no others of this
00378          ** priority are dropped, we can send a continuous stream of this
00379          ** priority from the earliest point at which there was sufficient
00380          ** bandwidth).
00381          */
00382          if (   p_queue->sorted[_queuesize - 1]->pri != EW_PRIORITY_MIN
00383              && p_queue->insert_indices[p_priority - 1] < _queuesize
00384             )
00385          {
00386             /*
00387             ** The insert point for the priority immediately prior to this one
00388             ** is before the end of the array, indicating that this item is of
00389             ** the same priority as the last priority in the array.
00390             */
00391             /* GET THE SOURCE CONTAINER */
00392             _wrk_pointer = p_queue->sorted[ p_queue->insert_indices[p_priority - 1] ];
00393 
00394             /* SHIFT ITEMS UP */
00395             _shift_stt = p_queue->insert_indices[p_priority - 1];
00396             _shift_end = _queuesize - 1;
00397             _shift_direction = EW_PRI_SHIFT_UP;
00398 
00399             _doSwap = 1;
00400             _updateIdx = 1;
00401 
00402             r_status = EW_PRI_RETDROP;
00403 
00404          }
00405          else
00406          {
00407             r_status = EW_PRI_RETREJECT;
00408          }
00409       }
00410 
00411 
00412       if ( _doSwap == 1 )
00413       {
00414 
00415          _wrk_pointer->pri           = p_priority;
00416          (_wrk_pointer->logo).type   = p_logo.type;
00417          (_wrk_pointer->logo).mod    = p_logo.mod;
00418          (_wrk_pointer->logo).instid = p_logo.instid;
00419          _wrk_pointer->length        = p_size;
00420          /*
00421          ** copy message text
00422          **/
00423          if ( _wrk_pointer->data != NULL ) /* avoid invalid write at termination */
00424          {
00425             memcpy( _wrk_pointer->data, p_data, p_size );
00426          }
00427       }
00428 
00429 
00430       if ( _shift_direction != EW_PRI_SHIFT_NONE && p_queue->sorted != NULL )
00431       {
00432          PRI_QUEUE_ENTRY * _temp = p_queue->sorted[_shift_stt];
00433 
00434 #ifdef DEBUG_DETAILS
00435          sprintf( dbgstr
00436                 , "DEBUG add_item() shifting %d -> %d (%d)\n"
00437                 , _shift_stt, _shift_end, _shift_direction
00438                 );
00439          logit( "t" 
00440               , dbgstr
00441               , "export_pri"
00442               , "MOD_EXPORT_SCN"
00443               );
00444 #endif
00445          for ( _idx = _shift_stt
00446              ; ( _shift_direction == EW_PRI_SHIFT_UP ? _idx < _shift_end : _idx > _shift_end )
00447              ; _idx += _shift_direction
00448              )
00449          {
00450 #ifdef DEBUG_DETAILS
00451             sprintf( dbgstr
00452                    , "DEBUG shifting [%d] = [%d]\n"
00453                    , _idx, _idx + _shift_direction
00454                    );
00455             logit( "t" 
00456                  , dbgstr
00457                  , "export_pri"
00458                  , "MOD_EXPORT_SCN"
00459                  );
00460 #endif
00461             p_queue->sorted[ _idx ] = p_queue->sorted[ _idx + _shift_direction ];
00462          }
00463          p_queue->sorted[_shift_end] = _temp;
00464       }
00465 
00466 
00467       if ( _updateIdx == 1 )
00468       {
00469          /* Adjust insert indices */
00470          for ( _idx = p_priority ; _idx < EW_PRIORITY_COUNT ; _idx++ )
00471          {
00472             if ( ( p_queue->insert_indices[_idx] + 1 ) <= _queuesize )
00473             {
00474                p_queue->insert_indices[_idx] = p_queue->insert_indices[_idx] + 1;
00475             }
00476          }
00477       }
00478    }
00479 
00480    ReleaseSpecificMutex( &p_queue->lock );
00481 
00482    return r_status;
00483 }
00484 
00485 
00486 /*********************************************************
00487 **
00488 **********************************************************/
00489 int peek_next_item( PRI_QUEUE   * p_queue
00490                   , MSG_LOGO    * p_logoptr
00491                   , EW_PRIORITY * p_priptr
00492                   )
00493 {
00494    int r_status = EW_PRI_NOITEM;  /* no items in queue */
00495 
00496    if ( p_queue == NULL )
00497    {
00498       return( EW_PRI_RETQNULL );
00499    }
00500 
00501    if ( p_queue->queuesize < 1  )
00502    {
00503       return( EW_PRI_RETNOTREADY );
00504    }
00505 
00506    RequestSpecificMutex( &p_queue->lock );
00507 
00508    if ( p_queue->sorted != NULL )
00509    {
00510       PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
00511       /*
00512       ** Check the priority of the object referenced by the
00513       ** first pointer in the array.
00514       **
00515       ** If the priority is none, then there are no items in the list
00516       */
00517       _wrk_pointer = p_queue->sorted[0];
00518 
00519       if ( _wrk_pointer->pri != EW_PRIORITY_NONE )
00520       {  
00521          /* copy the logo */
00522          p_logoptr->type   = (_wrk_pointer->logo).type;
00523          p_logoptr->mod    = (_wrk_pointer->logo).mod ;
00524          p_logoptr->instid = (_wrk_pointer->logo).instid;
00525   
00526          *p_priptr  = _wrk_pointer->pri;  /* message priority */
00527 
00528          r_status = EW_PRI_RETNORMAL;
00529       }
00530    }
00531 
00532    ReleaseSpecificMutex( &p_queue->lock );
00533 
00534    return r_status;
00535 }
00536 
00537 
00538 /*********************************************************
00539 **
00540 **********************************************************/
00541 int pop_next_item( PRI_QUEUE * p_queue
00542                  , MSG_LOGO  * p_logoptr
00543                  , long      * p_sizeptr
00544                  , PRI_DATA    p_data
00545                  )
00546 {
00547 #ifdef LOG_DEBUG
00548    char dbgstr[120];
00549 #endif
00550    
00551    int r_status = EW_PRI_NOITEM;  /* no items in queue */
00552 
00553    PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
00554    unsigned long _idx
00555                , _sz
00556                ;
00557 
00558    if ( p_queue == NULL )
00559    {
00560       return( EW_PRI_RETQNULL );
00561    }
00562 
00563    if ( p_queue->queuesize < 1  )
00564    {
00565       return( EW_PRI_RETNOTREADY );
00566    }
00567 
00568    RequestSpecificMutex( &(p_queue->lock) );
00569 
00570 
00571    /*
00572    ** Check the priority of the object referenced by the
00573    ** first pointer in the array.
00574    **
00575    ** If the priority is none, then there are no items in the list
00576    */
00577    _wrk_pointer = p_queue->sorted[0];
00578 
00579    if ( _wrk_pointer->pri != EW_PRIORITY_NONE )
00580    { 
00581       /* copy the logo */
00582       p_logoptr->type   = (_wrk_pointer->logo).type;
00583       p_logoptr->mod    = (_wrk_pointer->logo).mod ;
00584       p_logoptr->instid = (_wrk_pointer->logo).instid;
00585   
00586       *p_sizeptr  = _wrk_pointer->length;  /* message length */
00587 
00588       /*
00589       ** copy text of message to caller's memory 
00590       */
00591       memcpy( p_data, _wrk_pointer->data, (size_t)(_wrk_pointer->length) );
00592 
00593       /* shift pointers to the other queue entries up */
00594       for ( _idx = 0, _sz = p_queue->queuesize - 1 ; _idx < _sz ; _idx++ )
00595       {
00596          p_queue->sorted[_idx] = p_queue->sorted[_idx + 1];
00597       }
00598 
00599       /* clear the queue entry item */
00600       _wrk_pointer->pri    = EW_PRIORITY_NONE;
00601       _wrk_pointer->length = 0;
00602 
00603       /*
00604       ** Put the pointer to the now empty queue entry
00605       ** at the end of the sorted array
00606       */
00607       p_queue->sorted[p_queue->queuesize - 1] = _wrk_pointer;
00608 
00609       /* update the insert indices */
00610       for ( _idx = 0 ; _idx < EW_PRIORITY_COUNT ; _idx++ )
00611       {
00612          if ( 0 <= ( p_queue->insert_indices[_idx] - 1 ) )
00613          {
00614             p_queue->insert_indices[_idx] = p_queue->insert_indices[_idx] - 1;
00615          }
00616       }
00617 
00618       /* update the used count */
00619       (p_queue->itemsused)--;
00620       
00621       r_status = EW_PRI_RETNORMAL;
00622    }
00623 
00624 #ifdef DEBUG_DETAILS
00625    sprintf( dbgstr
00626           , "%%s(%%s): DEBUG pop_next_item() releasing mutex; q state: %d of %d\n"
00627           , p_queue->itemsused
00628           , p_queue->queuesize
00629           );
00630    logit( "t" 
00631         , dbgstr
00632         , "export_pri"
00633         , "MOD_EXPORT_SCN"
00634         );
00635 #endif
00636 
00637    ReleaseSpecificMutex( &p_queue->lock );
00638 
00639    return r_status;
00640 }
00641 
00642 
00643 
00644 
00645 
00646 

Generated on Tue May 6 09:16:07 2003 for Earthworm Libs by doxygen1.3-rc3