Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

mem_circ_queue.c

Go to the documentation of this file.
00001 
00002 /*
00003  *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
00004  *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
00005  *
00006  *    $Id: mem__circ__queue_8c-source.html 2161 2006-05-19 16:55:03Z paulf $
00007  *
00008  *    Revision history:
00009  *     $Log$
00009  *     Revision 1.1  2006/05/19 16:55:02  paulf
00009  *     first inclusion
00009  *
00010  *     Revision 1.2  2001/04/12 03:57:44  lombard
00011  *     Added functions dumpqueue, undumpqueue and peekNextElement
00012  *     to allow saving queue to disk and reloading.
00013  *
00014  *     Revision 1.1  2000/02/14 18:51:48  lucky
00015  *     Initial revision
00016  *
00017  *
00018  */
00019 
00020 /* 
00021 This is son of in 'queue_max_size.c', which is son of queue, which Will got from
00022 Kelley and Pohl. These were clever linked list schemes which allocated as needed.
00023 This was too scary: the system could run out of memory and fail after running for
00024 a long time.
00025         So, out with the cleverness, and in with brute force: this version pre-
00026 allocates the max amount of memory it will ever use at startup, and does the
00027 buffering in an array of message slots. The buffering is circular, as before:
00028 When we run out of message slots, we overwrite the oldest message.
00029 alex 2/8/98
00030 */
00031 
00032 #include <errno.h>
00033 #include <stdio.h>
00034 #include <string.h>
00035 #include <stdlib.h>
00036 #include <earthworm.h>
00037 #include <time.h>
00038 #include <mem_circ_queue.h>
00039 
00040 /* Internal Function Prototypes */
00041 static void inc_circular( QUEUE* q, Q_POS * pQPos);
00042 static void dec_circular( QUEUE* q, Q_POS * pQPos);
00043 
00044 /****************************************************************
00045 *                       initqueue                               *
00046 *  allocates the three queue regions                            *
00047 *       returns:                                                *
00048 *               -1 if we couldn't get the memory                *
00049 *               -2 if the amount implied is bigger than         *
00050 *                  a long                                       *
00051 *****************************************************************/
00052 int initqueue( QUEUE* q, unsigned long maxElements, 
00053                unsigned long elementMaxSize )
00054 {
00055   unsigned long         temp;
00056   QUEUE_ENTRY*  pQE;
00057   unsigned long StartingOffset, malloc_size;
00058 
00059   /* First allocate the QUEUE structure
00060    *************************************/
00061   /* this contains the pointers to the start of the message descriptor array
00062      and the message storage area */
00063   if (q == (QUEUE*)NULL) return(-1);
00064 
00065   /* save some arguments to local variables
00066    *****************************************/
00067   q->MyMaxElements = maxElements;
00068   q->MyMaxSize = elementMaxSize;
00069   q->NumOfElements = 0;
00070 
00071   /* Is the total number of bytes within an unsigned long?
00072    * I really hope so, because otherwise this is
00073    * going to be a 4GB queue.
00074    ********************************************/
00075   temp =  q->MyMaxElements * q->MyMaxSize;
00076   temp = temp/maxElements;
00077   if (temp != elementMaxSize) return (-2);  
00078   
00079   /* Allocate the array of queue entries
00080    ***************************************************/
00081   malloc_size= maxElements * (sizeof(QUEUE_ENTRY) + elementMaxSize);
00082   q->pQE = (QUEUE_ENTRY*)malloc(malloc_size);
00083   if (q->pQE == (QUEUE_ENTRY*)NULL) return(-1);
00084   
00085   /* Set DATA pointers for individual elements
00086    **********************************************************************/
00087   pQE= q->pQE;   /* element of the QUEUE structure which points to the array of buffer descriptors */
00088   
00089   StartingOffset=((unsigned long) pQE) + (maxElements * sizeof(QUEUE_ENTRY));
00090    
00091   for (temp=0; temp< maxElements; temp++)
00092   {
00093     /*  For each element, set the data pointer */
00094     pQE->d = (char *)(StartingOffset + (elementMaxSize * temp));
00095     pQE++;
00096   }
00097 
00098   /* set in and out pointers to indicate empty queue
00099    *************************************************/
00100   q->first=q->last=0;
00101   /* in  = out = first descriptor */
00102 
00103   return(0);
00104 }
00105 
00106 /***********************************************************************
00107  *                              dequeue                                *
00108  *                                                                     *
00109  *  Copies oldest message and its logo into caller's space.            *
00110  *  Returns 0 if no error.                                             *
00111  *  Returns -1 if the queue is empty.                                  *
00112  ***********************************************************************/
00113 int dequeue( QUEUE *q, DATA x, long* size, MSG_LOGO* userLogoPtr )
00114 {
00115   QUEUE_ENTRY * pFQE; /* Pointer to first(oldest) queue entry */
00116 
00117   if (!(q->NumOfElements) ) /* then the queue is empty */
00118     return( -1 );
00119 
00120   /* set shortcut for first queue element */
00121   pFQE=&(q->pQE[q->first]);
00122   
00123   /* copy text of message to caller's memory 
00124    ******************************************/
00125   memcpy( x, pFQE->d, (size_t)(pFQE->length) );
00126   
00127   /* copy the logo
00128    ***************/
00129   userLogoPtr->type   = (pFQE->queueLogo).type;
00130   userLogoPtr->mod    = (pFQE->queueLogo).mod ;
00131   userLogoPtr->instid = (pFQE->queueLogo).instid;
00132   
00133   *size  = pFQE->length;  /* message length */
00134   
00135   /* move the out pointer
00136    **********************/
00137   inc_circular(q, &(q->first) );   
00138   
00139   q->NumOfElements--;  /* we're dequeueing an element, so there is one less */
00140   
00141   return( 0 );
00142 }
00143 
00144 
00145 /***********************************************************************
00146  *                              enqueue                                *
00147  *                                                                     *
00148  *  moves message into circular buffer.                                *
00149  *  Copies string x to the new element.                                *
00150  *  Returns 0 if no error.                                             *
00151  *  Returns -1 message too big                                         *
00152  *  Returns -3 if we clobbered an unsent message (stepped on our tail) *
00153  ***********************************************************************/
00154 int enqueue( QUEUE *q, DATA x, long size, MSG_LOGO userLogo )
00155 {
00156   int ourRet;
00157   QUEUE_ENTRY * pLQE;  /* pointer to last(youngest) entry in queue */
00158   
00159   ourRet=0 ; /* presume all will go well */
00160   
00161   /* if message is larger than max, do nothing and return error
00162    ************************************************************/
00163   if (size > q->MyMaxSize) return(-1); 
00164   
00165   /* set shortcut for last queue element */
00166   pLQE=&((q->pQE)[q->last]);
00167   
00168   /* copy message text
00169    *******************/
00170   memcpy( pLQE->d, x, size);
00171   
00172   /* copy logo and size
00173    *********************/
00174   (pLQE->queueLogo).type   = userLogo.type;
00175   (pLQE->queueLogo).mod    = userLogo.mod;
00176   (pLQE->queueLogo).instid = userLogo.instid;
00177   (pLQE)->length = size;
00178   
00179 
00180   /* move to next buffer
00181    *********************/
00182   inc_circular(q, &(q->last) );   
00183   if( q->NumOfElements == q->MyMaxElements ) /* then the queue is full */
00184   { /* in that case we just overwrote the contents of the first(oldest)
00185        message in the queue, so 'drive' the outpointer forward one */
00186     inc_circular(q, &(q->first) );
00187     q->NumOfElements--;  /* we're overwriting an element, so there is one less */
00188     ourRet = -3;   /* meaning we're over-writing messages */
00189   }
00190 
00191   q->NumOfElements++;  /* we're adding an element, so there is one more */
00192 
00193   return( ourRet );
00194 }
00195 
00196 /* to move supplied pointer to the next buffer descriptor structure 
00197  * - in a circular way */
00198 static void inc_circular( QUEUE* q, Q_POS * pQPos)
00199 {
00200   (*pQPos)++;
00201   if(*pQPos >= q->MyMaxElements)
00202     *pQPos=0;
00203 }
00204 
00205 /* to move supplied pointer to the prev buffer descriptor structure 
00206  * - in a circular way */
00207 static void dec_circular( QUEUE* q, Q_POS * pQPos)
00208 {
00209   (*pQPos)--;
00210   if((*pQPos) <  0)
00211     *pQPos= (q->MyMaxElements)-1;
00212 }
00213 
00214 
00215 int getNumOfElementsInQueue( QUEUE * q)
00216 {
00217   return(q->NumOfElements);
00218 }
00219 
00220 Q_POS getNext(QUEUE * q, int QueuePosition)
00221 {
00222   Q_POS temp;
00223   
00224   temp=QueuePosition;
00225   if(temp < 0 || temp >= q->MyMaxElements)
00226     return(-1);
00227   else
00228   {
00229     inc_circular(q,&temp);
00230     return(temp);
00231   }
00232 }
00233 
00234 
00235 Q_POS getPrev(QUEUE * q, int QueuePosition)
00236 {
00237   Q_POS temp;
00238   
00239   temp=QueuePosition;
00240   if(temp < 0 || temp >= q->MyMaxElements)
00241     return(-1);
00242   else
00243   {
00244     dec_circular(q,&temp);
00245     return(temp);
00246   }
00247 }
00248 
00249 Q_POS getPosFirst(QUEUE * q)
00250 {
00251   return(q->first);
00252 }
00253 
00254 
00255 Q_POS getPosLast(QUEUE * q)
00256 {
00257   return(q->last);
00258 }
00259 
00260 /*
00261  * dumpqueue: dump the contents of a queue to a file to be used for restarts
00262  *     returns: 0 on success
00263  *             -1 on errors opening or writing file; file is deleted
00264  */
00265 int dumpqueue( QUEUE *q, char *filename)
00266 {
00267   QUEUE_ENTRY *pQE;
00268   FILE *fp;
00269   long i;
00270   Q_POS p;
00271   int ret = 0;
00272   time_t timestamp;
00273   
00274   if ( (fp = fopen(filename, "wb")) == (FILE *)NULL)
00275     return( -1 );
00276   
00277   timestamp = time(0);
00278   if (fwrite(&timestamp, sizeof(time_t), 1, fp) < 1)
00279   {
00280     ret = -1;
00281     goto Cleanup;
00282   }
00283   if (fwrite(&q->NumOfElements, sizeof(long), 1, fp) < 1)
00284   {
00285     ret = -1;
00286     goto Cleanup;
00287   }
00288   /* Loop through all the queue entries */
00289   p = q->first;
00290   for (i = 0l; i < q->NumOfElements; i++, inc_circular(q, &p))
00291   {
00292     pQE = &(q->pQE[p]);
00293     if (fwrite(&pQE->queueLogo, sizeof(MSG_LOGO), 1, fp) < 1)
00294     {
00295       ret = -1;
00296       goto Cleanup;
00297     }
00298     if (fwrite(&pQE->length, sizeof(long), 1, fp) < 1)
00299     {
00300       ret = -1;
00301       goto Cleanup;
00302     }
00303     if ((long)fwrite(pQE->d, sizeof(char), pQE->length, fp) < pQE->length)
00304     {
00305       ret = -1;
00306       goto Cleanup;
00307     }
00308   }
00309   if (fwrite(&timestamp, sizeof(time_t), 1, fp) < 1)
00310   {
00311     ret = -1;
00312     goto Cleanup;
00313   }
00314   
00315  Cleanup:
00316   fclose(fp);
00317   if (ret < 0)
00318     unlink(filename);
00319 
00320   return( ret );
00321 }
00322 
00323     
00324 /*
00325  * undumpqueue: read a queue file previously written by dumpqueue().
00326  *              Contents of file will be inserted into the specified queue.
00327  *              The queue is assumed to be empty on entry to this function.
00328  *   Returns: 0 on success
00329  *           +1 if dump file does not exist
00330  *           -1 on errors opening or reading file
00331  *           -2 if there are more entries in the file than will fit in queue
00332  *           -3 if file entry is larger than will fit in queue
00333  *           -4 if timestamps at each end of file don't match
00334  */
00335 int undumpqueue( QUEUE *q, char *filename)
00336 {
00337   QUEUE_ENTRY *pQE;
00338   FILE *fp;
00339   long i, num_entries;
00340   int ret = 0;
00341   time_t timestamp1, timestamp2;
00342   
00343   if ( (fp = fopen(filename, "rb")) == (FILE *)NULL)
00344   {
00345     if (errno == ENOENT)
00346       return( +1);
00347     else
00348       return( -1 );
00349   }
00350   
00351   if (fread(&timestamp1, sizeof(time_t), 1, fp) < 1)
00352   {
00353     ret = -1;
00354     goto Cleanup;
00355   }
00356 
00357   if (fread(&num_entries, sizeof(long), 1, fp) < 1)
00358   {
00359     ret = -1;
00360     goto Cleanup;
00361   }
00362   if (num_entries > q->MyMaxElements)
00363   {
00364     ret = -2;
00365     goto Cleanup;
00366   }
00367   
00368   for (i = 0l; i < num_entries; i++)
00369   {
00370     pQE = &(q->pQE[q->last]);
00371     if (fread(&pQE->queueLogo, sizeof(MSG_LOGO), 1, fp) < 1)
00372     {
00373       ret = -1;
00374       goto Cleanup;
00375     }
00376     if (fread(&pQE->length, sizeof(long), 1, fp) < 1)
00377     {
00378       ret = -1;
00379       goto Cleanup;
00380     }
00381     if (pQE->length > q->MyMaxSize)
00382     {
00383       ret = -3;
00384       goto Cleanup;
00385     }
00386     if ((long)fread(pQE->d, sizeof(char), pQE->length, fp) < pQE->length)
00387     {
00388       ret = -1;
00389       goto Cleanup;
00390     }
00391     inc_circular(q, &q->last);
00392     q->NumOfElements++;
00393   }
00394   
00395   if (fread(&timestamp2, sizeof(time_t), 1, fp) < 1)
00396   {
00397     ret = -1;
00398     goto Cleanup;
00399   }
00400   if (timestamp2 != timestamp1)
00401   { /* Timestamps don't match; can't trust file, so reset the queue to empty */
00402     q->last = q->first = 0;
00403     q->NumOfElements = 0;
00404     ret = -4;
00405   }
00406   
00407  Cleanup:
00408   fclose(fp);
00409 
00410   return( ret );
00411 }
00412 
00413 /*
00414  * peekNextElement: return a pointer to the next element without changing it.
00415  *    Returns: pointer to data on success
00416  *             NULL if there are no elements in the queue
00417  */
00418 DATA peekNextElement( QUEUE *q )
00419 {
00420   if (q->NumOfElements > 0)
00421     return( q->pQE[q->first].d );
00422   else
00423     return( NULL );
00424 }

Generated on Tue May 6 09:16:04 2003 for Earthworm Libs by doxygen1.3-rc3