GridLab
Grid Application Toolkit

A simple API for Grid Applications
GAT

Menu



Main Page   Alphabetical List   Compound List   File List   Compound Members   File Members  

pipe.c

Go to the documentation of this file.
00001 /** @file adaptor.c
00002  * Main file for the pipe adaptor.
00003  * 
00004  * Test adaptor to check the GATPipe API. Provides a simple
00005  * file GATPipeCPI implementation.
00006  * 
00007  * @date $Date: 2004/05/13 09:52:23 $
00008  * 
00009  * @version $Header: /export/cvs-gridlab/GridLabWeb/WorkPackages/wp-1/Doc/C-Reference/pipe_8c-source.html,v 1.6 2004/05/13 09:52:23 merzky Exp $
00010  *
00011  *  Copyright (C) Kelly Davis
00012  *  This file is part of the GAT Engine.
00013  *  Contributed by Kelly Davis <kdavis@aei.mpg.de>.
00014  *
00015  *  This library is free software; you can redistribute it and/or
00016  *  modify it under the terms of the GNU Lesser General Public
00017  *  License as published by the Free Software Foundation; either
00018  *  version 2.1 of the License, or (at your option) any later version.
00019  *
00020  *  The GNU C Library is distributed in the hope that it will be useful,
00021  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00022  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00023  *  Lesser General Public License for more details.
00024  *
00025  *  You should have received a copy of the GNU Lesser General Public
00026  *  License along with the GNU C Library; if not, write to the Free
00027  *  Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
00028  *  02111-1307 USA. Copyright notice.
00029  */
00030 
00031 static const char *rcsid = "$Header: /export/cvs-gridlab/GridLabWeb/WorkPackages/wp-1/Doc/C-Reference/pipe_8c-source.html,v 1.6 2004/05/13 09:52:23 merzky Exp $";
00032 
00033 /* System Header Files */
00034 #include <stdio.h>
00035 #include <stdlib.h>
00036 #include <string.h>
00037 
00038 #include <unistd.h>
00039 #if !defined(WIN32)
00040 #include <arpa/inet.h>
00041 #endif
00042 #include <sys/types.h>
00043 #include <sys/socket.h>
00044 #include <netinet/in.h>
00045 
00046 /* GAT Header Files */
00047 #include <GATCPI.h>
00048 
00049 /* Adaptor Header Files */
00050 #include "pipe.h"
00051 #include "Common.h"
00052 
00053 /* Macros */
00054 #define __countof(x) (sizeof(x)/sizeof(x[0]))
00055 
00056 /* Structures, unions and enums */
00057 
00058 /* Instance data */ 
00059 
00060 /* Static function prototypes */
00061 static void 
00062 pipe_adaptor_PipeCPI_Destroy(void *data);
00063 
00064 /* instance specific functions */
00065 GATResult 
00066 pipe_adaptor_PipeCPI_CreateInstance(void *adaptor_data, 
00067   GATPipeCPI_Instance *data, void *information);
00068 
00069 void 
00070 pipe_adaptor_PipeCPI_DestroyInstance(void *adaptor_data, 
00071   GATPipeCPI_Instance *data);
00072 
00073 GATResult 
00074 pipe_adaptor_PipeCPI_CloneInstance(void *adaptor_data, 
00075   GATPipeCPI_Instance const *data, 
00076   GATPipeCPI_Instance *new_data);
00077 
00078 GATResult 
00079 pipe_adaptor_PipeCPI_EqualsInstance(void *adaptor_data, 
00080   GATPipeCPI_Instance const *lhs, GATPipeCPI_Instance const *rhs, 
00081   GATBool *isequal);
00082 
00083 /* Streamable functions */
00084 static GATResult pipe_adaptor_PipeCPI_Read(void *data, 
00085   GATPipeCPI_Instance const *instance_data, void *buffer, GATuint32 size, GATuint32 *readBytes);
00086   
00087 static GATResult pipe_adaptor_PipeCPI_Write(void *data, 
00088   GATPipeCPI_Instance const *instance_data, void const *buffer, GATuint32 size, GATuint32 *writtenBytes);
00089   
00090 static GATResult pipe_adaptor_PipeCPI_Seek(void *data, 
00091   GATPipeCPI_Instance const *instance_data, GATOrigin origin, GATint32 offset, GATuint32 *new_position);
00092   
00093 static GATResult pipe_adaptor_PipeCPI_Close(void *data, 
00094   GATPipeCPI_Instance const *instance_data);
00095           
00096 /* Execute a GATPipe get_metrics operation */
00097 static GATResult 
00098 pipe_adaptor_PipeCPI_GetMetrics(void *data, 
00099   GATPipeCPI_Instance const *instance_data, GATList_GATMetric *metrics);
00100 
00101 /* Execute a GATPipe get_metric_event operation */
00102 static GATResult 
00103 pipe_adaptor_PipeCPI_GetMetricEvent(void *data, 
00104   GATPipeCPI_Instance const *instance_data, GATMetric metric, 
00105     GATMetricEvent *event);
00106     
00107 /* local helper functions */
00108 static GATResult 
00109 pipe_adaptor_PipeCPI_CreateServerInstance(void *adaptor_data, 
00110   GATPipeCPI_Instance *data, int clientSocket);
00111   
00112 static GATResult 
00113 pipe_adaptor_PipeCPI_CreateClientInstance(void *adaptor_data, 
00114   GATPipeCPI_Instance *data, int clientSocket);
00115   
00116 static GATResult 
00117 pipe_adaptor_FireReadEvent(GATContext context, 
00118   GATMonitorable_Impl monitorable, GATObject_const source, char const *name);
00119   
00120 static GATResult 
00121 pipe_adaptor_FireWriteEvent(GATContext context, 
00122   GATMonitorable_Impl monitorable, GATObject_const source, char const *name);
00123   
00124 static GATResult 
00125 pipe_adaptor_FireSeekEvent(GATMonitorable_Impl monitorable, 
00126   GATObject_const source, char const *name);
00127   
00128 static GATResult 
00129 pipe_adaptor_FireCloseEvent(GATContext context, 
00130   GATMonitorable_Impl monitorable, GATObject_const source, char const *name);
00131       
00132 /* File scope variables */
00133 static GATStaticMetric metric_data[] = {
00134   /* pipe.read event */
00135   { 
00136     "pipe.read",                    /* name */
00137     GATMeasurementType_EventLike,   /* type */
00138     GATType_String,                 /* data type */
00139     "",                             /* unit */
00140     0,                              /* parameter count */
00141     0                               /* parameters */
00142   },
00143   /* pipe.write event */
00144   { 
00145     "pipe.write",                   /* name */
00146     GATMeasurementType_EventLike,   /* type */
00147     GATType_String,                 /* data type */
00148     "",                             /* unit */
00149     0,                              /* parameter count */
00150     0                               /* parameters */
00151   },
00152   /* pipe.seek event */
00153   { 
00154     "pipe.seek",                    /* name */
00155     GATMeasurementType_EventLike,   /* type */
00156     GATType_String,                 /* data type */
00157     "",                             /* unit */
00158     0,                              /* parameter count */
00159     0                               /* parameters */
00160   },
00161   /* pipe.close event */
00162   { 
00163     "pipe.close",                   /* name */
00164     GATMeasurementType_EventLike,   /* type */
00165     GATType_String,                 /* data type */
00166     "",                             /* unit */
00167     0,                              /* parameter count */
00168     0                               /* parameters */
00169   },
00170 };
00171 
00172 /* symbolic constants for the metric positions inside the metric_data array */
00173 #define METRIC_PIPE_READ   0
00174 #define METRIC_PIPE_WRITE  1
00175 #define METRIC_PIPE_SEEK   2
00176 #define METRIC_PIPE_CLOSE  3
00177 
00178 
00179 /* External functions */
00180 
00181 /** pipe_adaptor_register
00182  *  Registers all CPIs this adaptor provides.
00183  *  This function is invoked by the loader in the GATEngine when
00184  *  an instance of this adaptor is loaded.  Each instance has its
00185  *  own private configuration table.
00186  *
00187  *  For every CPI provided the adapter has to hand all the corresponding 
00188  *  function pointers to the engine to allow to be called back under certain
00189  *  circumstances. Additionally the adaptor may allocate some private data for
00190  *  every loaded instance. This data is handed back to the adaptor as the first
00191  *  parameter to each of the subsequently called functions.
00192  *
00193  *  @param registry The registry the adaptor should register its CPIs with.
00194  *  @param system_config The system configuration table.
00195  *  @param instance_config The configuration table for this instance.
00196  *  @param token An arbitrary token used by the loader to identify this adaptor 
00197  *        instance
00198  *
00199  *  @return An error code.
00200  */
00201 GATResult 
00202 pipe_adaptor_register_pipe(GATContext error_context, GATRegistry registry, 
00203   GATTable_const system_config, GATTable_const instance_config, void *token)
00204 {
00205   GAT_USES_STATUS(error_context, "pipe_adaptor_register_pipe");
00206   
00207   GATPipeCPI cpi = NULL;
00208   GATPipeCPI_Data cpidata;
00209   
00210   GAT_UNUSED_PARAMETER(system_config);
00211   GAT_UNUSED_PARAMETER(instance_config);
00212   
00213   memset(&cpidata, 0, sizeof(GATPipeCPI_Data));
00214     
00215   /*
00216    *  provide the appropriate callback functions
00217    */
00218     
00219   /* adaptor instance data */
00220   cpidata.data = NULL;
00221   cpidata.destroy = pipe_adaptor_PipeCPI_Destroy;
00222   
00223   /*  Lifetime control and basic GATObject functionality support for every
00224    *  object instance created by the client.
00225    */
00226   cpidata.create_instance = pipe_adaptor_PipeCPI_CreateInstance;
00227   cpidata.destroy_instance = pipe_adaptor_PipeCPI_DestroyInstance;
00228   cpidata.clone_instance = pipe_adaptor_PipeCPI_CloneInstance;
00229   cpidata.equals_instance = pipe_adaptor_PipeCPI_EqualsInstance;
00230 
00231   /* streamable support */
00232   cpidata.read = pipe_adaptor_PipeCPI_Read;
00233   cpidata.write = pipe_adaptor_PipeCPI_Write;
00234   cpidata.seek = pipe_adaptor_PipeCPI_Seek;
00235   cpidata.close = pipe_adaptor_PipeCPI_Close;
00236   
00237   /* monitoring support */
00238   cpidata.get_metrics = pipe_adaptor_PipeCPI_GetMetrics;
00239   cpidata.get_metric_event = pipe_adaptor_PipeCPI_GetMetricEvent;
00240 
00241   /* Create a GATPipeCPI object. */
00242   cpi = GATPipeCPI_Create(GATPIPECPI_VERSION, &cpidata);
00243   if(NULL != cpi)
00244   {
00245     /* Need to pass preferences to allow matching
00246      * of a user's preferences with what this adaptor does.
00247      */
00248     GATPreferences preferences = GATPreferences_Create();
00249     if(NULL != preferences)
00250     {
00251       /* at least the Name preference should be defined */
00252       GAT_CREATE_STATUS(GATPreferences_Add(preferences, "Name", "pipe_adaptor"));
00253       GAT_CREATE_STATUS(GATPreferences_Add(preferences, "Security", "none"));
00254       GAT_CREATE_STATUS(GATPreferences_Add(preferences, "Local", "true"));
00255       
00256       GAT_CREATE_STATUS(GATRegistry_AddGATPipeCPI(registry, cpi, token, 
00257         preferences));
00258       
00259       GATPreferences_Destroy(&preferences);
00260     }
00261     else
00262     {
00263       GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00264     }
00265   }
00266   else
00267   {
00268     GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00269   }
00270   
00271   if (GAT_FAILED(GAT_CURRENT_STATUS()))
00272   {
00273     GATPipeCPI_Destroy(&cpi);
00274   }
00275 
00276   return GAT_RETURN_STATUS();
00277 }
00278 
00279 /* Local functions */
00280 
00281 /** pipe_adaptor_PipeCPI_Destroy
00282  *  Destroy the adaptor-provided data object on GATPipeCPI destruction.
00283  *  When the GATPipeCPI object created in this adaptor's _register
00284  *  function is destroyed, this function is invoked to allow the
00285  *  adaptor to cleanup.
00286  *
00287  *  @param data Adaptor-provided data object.
00288  */
00289 static void 
00290 pipe_adaptor_PipeCPI_Destroy(void *data)
00291 {
00292   GAT_UNUSED_PARAMETER(data);
00293 }
00294 
00295 /** pipe_adaptor_PipeCPI_CreateInstance
00296  *  Create a new CPI object instance.
00297  *  Adaptor implementation of create instance capability.
00298  *
00299  * @param adaptor_data Adaptor-provided data object.
00300  * @param data The pointer to the data structure containing at least 
00301  *        the constructor provided data items and a member instance_data, which 
00302  *        may be used by the adaptor to store CPI instance specific data.
00303  * @param information Additional infor required
00304  * @return An error code.
00305  */
00306 GATResult 
00307 pipe_adaptor_PipeCPI_CreateInstance(void *adaptor_data, 
00308   GATPipeCPI_Instance *data, void *information)
00309 {
00310   if (NULL != data)
00311   {
00312     GAT_USES_STATUS(data->context, "pipe_adaptor_PipeCPI_CreateInstance");
00313     
00314     if (NULL != information)
00315     {
00316       GATPipeAdaptorInfo *gatPipeAdaptorInfo = 
00317         (GATPipeAdaptorInfo *)information;
00318       
00319       if (GATTrue == gatPipeAdaptorInfo->isClientInfo)
00320       {
00321         GAT_CREATE_STATUS(pipe_adaptor_PipeCPI_CreateClientInstance( 
00322           adaptor_data, data, gatPipeAdaptorInfo->clientSocket));
00323 //          gatPipeAdaptorInfo->serverNodename, 
00324 //          gatPipeAdaptorInfo->serverPort));
00325       }
00326       else
00327       {
00328         GAT_CREATE_STATUS(pipe_adaptor_PipeCPI_CreateServerInstance( 
00329           adaptor_data, data, gatPipeAdaptorInfo->clientSocket));
00330       }
00331     }
00332     else
00333     {
00334       GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00335     }
00336     
00337     return GAT_RETURN_STATUS();
00338   }
00339   return GAT_INVALID_HANDLE;
00340 }
00341 
00342 /** pipe_adaptor_PipeCPI_CreateServerInstance
00343  *  Create a new CPI object instance.
00344  *  Adaptor implementation of create instance capability.
00345  *
00346  * @param adaptor_data Adaptor-provided data object.
00347  * @param data The pointer to the data structure containing at least 
00348  *        the constructor provided data items and a member instance_data, which 
00349  *        may be used by the adaptor to store CPI instance specific data.
00350  * @param clientSocket The client socket
00351  * @return An error code.
00352  */
00353 static GATResult 
00354 pipe_adaptor_PipeCPI_CreateServerInstance(void *adaptor_data, 
00355   GATPipeCPI_Instance *data, int clientSocket)
00356 {
00357   GAT_UNUSED_PARAMETER(adaptor_data);
00358   if (NULL != data)
00359   {
00360     GAT_USES_STATUS(data->context, "pipe_adaptor_PipeCPI_CreateServerInstance");
00361   
00362     int *newClientSocket = (int *) malloc(sizeof(int));
00363     if (NULL != newClientSocket)
00364     {
00365       *newClientSocket = clientSocket;
00366       data->instance_data = (void *) newClientSocket;
00367     }
00368     else
00369     {
00370       GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00371     }
00372     
00373     return GAT_RETURN_STATUS();
00374   }
00375   return GAT_INVALID_HANDLE;
00376 }
00377 
00378 /** pipe_adaptor_PipeCPI_CreateClientInstance
00379  *  Create a new CPI object instance.
00380  *  Adaptor implementation of create instance capability.
00381  *
00382  * @param adaptor_data Adaptor-provided data object.
00383  * @param data The pointer to the data structure containing at least 
00384  *        the constructor provided data items and a member instance_data, which 
00385  *        may be used by the adaptor to store CPI instance specific data.
00386  * @param serverNodename The server nodename
00387  * @param serverPort The server port
00388  * @return An error code.
00389  */
00390 static GATResult 
00391 pipe_adaptor_PipeCPI_CreateClientInstance(void *adaptor_data, 
00392   GATPipeCPI_Instance *data, int clientSocket)
00393 {
00394   GAT_UNUSED_PARAMETER(adaptor_data);
00395   if (NULL != data)
00396   {
00397     GAT_USES_STATUS(data->context, "pipe_adaptor_PipeCPI_CreateClientInstance");
00398   
00399     int *newClientSocket = (int *) malloc(sizeof(int));
00400     if (NULL != newClientSocket)
00401     {
00402       *newClientSocket = clientSocket;
00403       data->instance_data = (void *) newClientSocket;
00404     }
00405     else
00406     {
00407       GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00408     }
00409     
00410     return GAT_RETURN_STATUS();
00411   }
00412   return GAT_INVALID_HANDLE;
00413 }
00414 
00415 //  GAT_UNUSED_PARAMETER(adaptor_data);
00416 //  
00417 //  if (NULL != data) 
00418 //  {
00419 //    GAT_USES_STATUS(data->context, "pipe_adaptor_PipeCPI_CreateClientInstance");
00420 //    
00421 //    if (NULL != serverNodename && 0 <= serverPort)
00422 //    {
00423 //      int *newClientSocket = (int *) malloc(sizeof(int));
00424 //      if (NULL != newClientSocket)
00425 //      {
00426 //        *newClientSocket = socket(AF_INET, SOCK_STREAM, 0);
00427 //        if (0 <= *newClientSocket)
00428 //        {
00429 //          struct sockaddr_in serverAddress;
00430 //          struct hostent *hostend = NULL;
00431 //          
00432 //          memset(&serverAddress, 0, sizeof(serverAddress));
00433 //          serverAddress.sin_family = AF_INET;
00434 //          hostend = gethostbyname(serverNodename);
00435 //          serverAddress.sin_addr = *((struct in_addr *)hostend->h_addr);;
00436 //          serverAddress.sin_port = htons(serverPort);
00437 //
00438 //          if (INADDR_NONE != serverAddress.sin_addr.s_addr)
00439 //          {
00440 //            int connectReturn;
00441 //            
00442 //            connectReturn = connect(*newClientSocket, 
00443 //              (struct sockaddr *) &serverAddress, sizeof(serverAddress));
00444 //            if (0 <= connectReturn)
00445 //            {
00446 //              data->instance_data = (void *) newClientSocket;
00447 //            }
00448 //            else
00449 //            {
00450 //              GAT_CREATE_STATUS(GAT_FAIL);
00451 //            }
00452 //          }
00453 //          else
00454 //          {
00455 //            GAT_CREATE_STATUS(GAT_FAIL);
00456 //          }
00457 //        }
00458 //        else
00459 //        {
00460 //          GAT_CREATE_STATUS(GAT_FAIL);
00461 //        }
00462 //      
00463 //        if (GAT_FAILED(GAT_CURRENT_STATUS()))
00464 //        {
00465 //          close(*newClientSocket);
00466 //          free(newClientSocket);
00467 //        }
00468 //      }
00469 //      else
00470 //      {
00471 //        GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00472 //      }
00473 //    }
00474 //    else
00475 //    {
00476 //      GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00477 //    }
00478 //    return GAT_RETURN_STATUS();
00479 //  }  
00480 //  return GAT_INVALID_HANDLE;
00481 //}
00482 
00483 /** pipe_adaptor_PipeCPI_DestroyInstance
00484  *  Destroy a CPI object instance.
00485  *  Adaptor implementation of destroy instance capability.
00486  *
00487  * @param data Adaptor-provided data object.
00488  * @param instance_data The instance data of the CPI object to destroy.
00489  */
00490 void
00491 pipe_adaptor_PipeCPI_DestroyInstance(void *adaptor_data, 
00492   GATPipeCPI_Instance *data)
00493 {
00494   GAT_UNUSED_PARAMETER(adaptor_data);
00495   
00496   if(NULL != data)
00497   {
00498     int *serverSocket = (int *)data->instance_data;
00499     if (0 != *serverSocket)
00500     {
00501       close(*serverSocket);
00502       *serverSocket = 0;
00503     }
00504     free(data->instance_data);
00505   }  
00506 }
00507 
00508 /** pipe_adaptor_PipeCPI_CloneInstance
00509  *  Clone a CPI object instance.
00510  *  Adaptor implementation of the clone instance capability.
00511  *
00512  * @param data Adaptor-provided data object.
00513  * @param instance_data The instance data of this CPI object
00514  * @param new_instance_data The pointer to the variable, where the instance 
00515  *      data of the cloned CPI object is to be returned to.
00516  * 
00517  * @return An error code.
00518  */
00519 GATResult 
00520 pipe_adaptor_PipeCPI_CloneInstance(void *adaptor_data, 
00521   GATPipeCPI_Instance const *instance_data, 
00522   GATPipeCPI_Instance *new_instance_data)
00523 {
00524   GAT_UNUSED_PARAMETER(adaptor_data);
00525   if (NULL != instance_data)
00526   {
00527     GAT_USES_STATUS(instance_data->context, "pipe_adaptor_PipeCPI_CloneInstance");
00528     
00529     if (NULL != new_instance_data)
00530     {
00531       new_instance_data->instance_data = malloc(sizeof(int));
00532       if (NULL != new_instance_data->instance_data)
00533       {
00534         int *client_socket = (int *)instance_data->instance_data;
00535         int *new_client_socket = (int *)new_instance_data->instance_data;
00536         
00537         *new_client_socket = *client_socket;
00538       }
00539       else
00540       {
00541         GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00542       }
00543     }
00544     else
00545     {
00546       GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00547     }
00548         
00549     return GAT_RETURN_STATUS();
00550   }
00551   return GAT_INVALID_HANDLE;
00552 }
00553 
00554 /** pipe_adaptor_PipeCPI_EqualsInstance
00555  *  Clone a CPI object instance.
00556  *  Adaptor implementation of the clone instance capability.
00557  *
00558  * @param data Adaptor-provided data object.
00559  * @param lhs The instance data of the left CPI object.
00560  * @param lhs The instance data of the right CPI object.
00561  * @param isequal The pointer to the variable, where the result is to be 
00562  *        returned to.
00563  *
00564  * @return An error code.
00565  */
00566 GATResult 
00567 pipe_adaptor_PipeCPI_EqualsInstance(void *data, 
00568   GATPipeCPI_Instance const *lhs, GATPipeCPI_Instance const *rhs, 
00569   GATBool *isequal)
00570 {
00571   GATResult retval = GAT_INVALID_HANDLE;
00572   
00573   GAT_UNUSED_PARAMETER(data);
00574   
00575   if ( (NULL != lhs) && (NULL != rhs) && (NULL != isequal) )
00576   {
00577     *isequal = GATFalse;
00578     retval = GAT_SUCCESS;
00579     if( lhs == rhs )
00580     {
00581       *isequal = GATTrue;
00582       retval = GAT_SUCCESS;
00583     }
00584   }
00585   return retval;
00586 }
00587 
00588 
00589 /* Streamable support !!! */
00590 
00591 /** pipe_adaptor_PipeCPI_Read
00592  *  Reads data from a Pipe.
00593  *  Adaptor implementation of Pipe read capability.
00594  *
00595  * @param data Adaptor-provided data object.
00596  * @param instance_data The instance data of this CPI object
00597  * @param buffer The buffer into which data is read
00598  * @param size The size of the buffer
00599  * @param readBytes the number of bytes read
00600  * @return An error code.
00601  */
00602 static GATResult 
00603 pipe_adaptor_PipeCPI_Read(void *data, 
00604   GATPipeCPI_Instance const *instance_data, void *buffer, GATuint32 size, 
00605   GATuint32 *readBytes)
00606 {
00607   GAT_UNUSED_PARAMETER(data);
00608   if (NULL != instance_data)
00609   {
00610     GAT_USES_STATUS(instance_data->context, "pipe_adaptor_PipeCPI_Read");
00611   
00612     if (NULL != buffer && NULL != readBytes)
00613     {
00614       int *localSocket = (int *) instance_data->instance_data;
00615       int localReadBytes = recv (*localSocket, buffer, (size_t) size, 0);
00616       if (-1 != localReadBytes)
00617       {
00618         GAT_CREATE_STATUS(pipe_adaptor_FireReadEvent(
00619           instance_data->context,
00620           instance_data->monitorable, instance_data->source, "read"));
00621       }
00622       *readBytes = (GATuint32) localReadBytes;
00623     }
00624     else
00625     {
00626       GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00627     }
00628     
00629     return GAT_RETURN_STATUS();
00630   }  
00631   return GAT_INVALID_HANDLE;
00632 }
00633 
00634 /** pipe_adaptor_PipeCPI_Write
00635  *  Writes data to a Pipe.
00636  *  Adaptor implementation of Pipe write capability.
00637  *
00638  * @param data Adaptor-provided data object.
00639  * @param instance_data The instance data of this CPI object
00640  * @param buffer The buffer from which data is written
00641  * @param size The size of the buffer
00642  * @param writtenBytes the number of bytes written
00643  * @return An error code.
00644  */
00645 static GATResult 
00646   pipe_adaptor_PipeCPI_Write(void *data, 
00647   GATPipeCPI_Instance const *instance_data, void const *buffer, 
00648   GATuint32 size, GATuint32 *writtenBytes)
00649 {
00650   GAT_UNUSED_PARAMETER(data);
00651   if (NULL != instance_data)
00652   {
00653     GAT_USES_STATUS(instance_data->context, "");
00654       
00655     if (NULL != buffer && NULL != writtenBytes)
00656     {
00657       int *localSocket = (int *) instance_data->instance_data;
00658       int localWrittenBytes = send(*localSocket, buffer, (size_t)size, 0);
00659       if (-1 != localWrittenBytes)
00660       {
00661         GAT_CREATE_STATUS(pipe_adaptor_FireWriteEvent(
00662           instance_data->context,
00663           instance_data->monitorable, instance_data->source, "write"));
00664       }
00665       *writtenBytes = (GATuint32) localWrittenBytes;
00666     }
00667     else
00668     {
00669       GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00670     }
00671     
00672     return GAT_RETURN_STATUS();
00673   }  
00674   return GAT_INVALID_HANDLE;
00675 }
00676 
00677 /** pipe_adaptor_PipeCPI_Seek
00678  * Seeks on a Pipe.
00679  * Adaptor implementation of Pipe seek capability.
00680  *
00681  * @param data Adaptor-provided data object.
00682  * @param instance_data The instance data of this CPI object
00683  * @param origin The GATOrigin from whence to seek
00684  * @param offset The offset from the origin which to seek to
00685  * @param new_position The new position of the Pipe
00686  * @return An error code.
00687  */
00688 static GATResult pipe_adaptor_PipeCPI_Seek(void *data, 
00689   GATPipeCPI_Instance const *instance_data, GATOrigin origin, GATint32 offset, GATuint32 *new_position)
00690 {
00691   GAT_UNUSED_PARAMETER(data);
00692   GAT_UNUSED_PARAMETER(origin);
00693   GAT_UNUSED_PARAMETER(offset);
00694   GAT_UNUSED_PARAMETER(new_position);
00695   
00696   pipe_adaptor_FireSeekEvent( instance_data->monitorable, instance_data->source, "seek" );
00697   
00698   return GAT_NOTIMPL;
00699 }
00700 
00701 /** pipe_adaptor_PipeCPI_Close
00702  * Closes a Pipe
00703  * Adaptor implementation of Pipe close capability.
00704  *
00705  * @param data Adaptor-provided data object.
00706  * @param instance_data The instance data of this CPI object
00707  * @return An error code.
00708  */
00709 static GATResult 
00710 pipe_adaptor_PipeCPI_Close(void *data, 
00711   GATPipeCPI_Instance const *instance_data)
00712 {
00713   GAT_UNUSED_PARAMETER(data);
00714   if (NULL != instance_data)
00715   {
00716     GAT_USES_STATUS(instance_data->context, "pipe_adaptor_PipeCPI_Close");
00717     
00718     int *localSocket = (int *) instance_data->instance_data;
00719 
00720     if (NULL != instance_data->instance_data)
00721     {
00722       close(*localSocket);
00723       *localSocket = 0;
00724     
00725       GAT_CREATE_STATUS(pipe_adaptor_FireCloseEvent(instance_data->context,
00726         instance_data->monitorable, instance_data->source, "close"));
00727     }
00728     else
00729     {
00730       GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00731     }
00732     
00733     return GAT_RETURN_STATUS();
00734   }  
00735   return GAT_INVALID_HANDLE;
00736 }
00737 
00738 
00739 /* Monitoring support */ 
00740 
00741 /** pipe_adaptor_PipeCPI_GetMetrics
00742  *
00743  *  The function pipe_adaptor_PipeCPI_GetMetrics should return a 
00744  *  list of GATMetric objects supported by this adaptor. I.e. the adaptor is 
00745  *  capable to fire GATMetricEvents for the returned metrics. This list should 
00746  *  include all the supported metrics, event like and continuous ones.
00747  *
00748  *  @param data Adaptor-provided data object.
00749  *  @param instance_data Instance data for the CPI provider object.
00750  *  @param metrics The pointer to the variable, which receives the returned 
00751  *        list of metrics.
00752  *
00753  *  @return An error code.
00754  */
00755 static GATResult 
00756 pipe_adaptor_PipeCPI_GetMetrics(void *data, 
00757   GATPipeCPI_Instance const *instance_data, GATList_GATMetric *metrics)
00758 {
00759   GAT_UNUSED_PARAMETER(data);
00760   GAT_UNUSED_PARAMETER(instance_data);
00761   
00762   return GATMetric_CreateListOfMetrics(metric_data, __countof(metric_data), metrics);
00763 }
00764 
00765 /** pipe_adaptor_PipeCPI_GetMetricEvent
00766  *
00767  *  The function pipe_adaptor_PipeCPI_GetMetricEvent should 
00768  *  return the GATMetricEvent associated with the given continuous metric. This 
00769  *  function gets called for continuous metrics only, since returning the 
00770  *  metric event object to the caller is th only way for the client to get 
00771  *  access to it.
00772  *
00773  *  @param data Adaptor-provided data object.
00774  *  @param instance_data Instance data for the CPI provider object.
00775  *  @param metric The metric instance describing the metric event to return.
00776  *        This metric should be equivalent to one of the metrics returned by
00777  *        our own GetMetrics CPI function (see above). If this is another
00778  *        (not known to us metric), an error should be returned.
00779  *  @param event The pointer to the variable, which receives the metric event 
00780  *        to return.
00781  *
00782  *  @return An error code.
00783  */
00784 static GATResult 
00785 pipe_adaptor_PipeCPI_GetMetricEvent(void *data, 
00786   GATPipeCPI_Instance const *instance_data, GATMetric metric, 
00787   GATMetricEvent *event)
00788 {
00789   /* this adaptor supports no continuous metrics */
00790   
00791   GAT_UNUSED_PARAMETER(data);
00792   GAT_UNUSED_PARAMETER(instance_data);
00793   GAT_UNUSED_PARAMETER(metric);
00794   GAT_UNUSED_PARAMETER(event);
00795   
00796   return GAT_SUCCESS;
00797 }
00798 
00799 
00800 /** pipe_adaptor_FireReadEvent
00801  *
00802  *  The function pipe_adaptor_FireReadEvent fires a 
00803  *  'pipe read' event to all registered metric listeners.
00804  */
00805 static GATResult 
00806 pipe_adaptor_FireReadEvent(GATContext context, 
00807   GATMonitorable_Impl monitorable, GATObject_const source, char const *name)
00808 {
00809   GAT_USES_STATUS(context, "pipe_adaptor_FireReadEvent");
00810 
00811   if (NULL != monitorable && NULL != source && NULL != name)
00812   {
00813     GATMetric metric = NULL;
00814     
00815     GAT_CREATE_STATUS(GATMetric_CreateMetric(&metric_data[METRIC_PIPE_READ], 
00816       &metric));
00817     if (GAT_SUCCEEDED(GAT_CURRENT_STATUS()))
00818     {
00819       GATMetricEvent event = GATMetricEvent_Create_EventLike(source, metric, 
00820         (void *)name, (GATuint32)(strlen(name) + 1));
00821         
00822       if (NULL != event)
00823       {
00824         GAT_CREATE_STATUS(GATMonitorable_Impl_FireEvent(monitorable, metric, 
00825           event));
00826         GATMetricEvent_Destroy(&event);
00827       }
00828       else
00829       {
00830         GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00831       }
00832       
00833       GATMetric_Destroy(&metric);
00834     }
00835   }
00836   else
00837   {
00838     GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00839   }
00840   
00841   return GAT_RETURN_STATUS();
00842 }
00843 
00844 /** pipe_adaptor_FireWriteEvent
00845  *
00846  *  The function pipe_adaptor_FireWriteEvent fires a 
00847  *  'pipe write' event to all registered metric listeners.
00848  */
00849 static GATResult 
00850 pipe_adaptor_FireWriteEvent(GATContext context, 
00851   GATMonitorable_Impl monitorable, GATObject_const source, char const *name)
00852 {
00853   GAT_USES_STATUS(context, "pipe_adaptor_FireWriteEvent");
00854 
00855   if (NULL != monitorable && NULL != source && NULL != name)
00856   {
00857     GATMetric metric = NULL;
00858     GAT_CREATE_STATUS(GATMetric_CreateMetric(&metric_data[METRIC_PIPE_WRITE], 
00859       &metric));
00860 
00861     if (GAT_SUCCEEDED(GAT_CURRENT_STATUS()))
00862     {
00863       GATMetricEvent event = GATMetricEvent_Create_EventLike(source, metric, 
00864         (void *)name, (GATuint32)(strlen(name) + 1));
00865         
00866       if (NULL != event)
00867       {
00868         GAT_CREATE_STATUS(GATMonitorable_Impl_FireEvent(monitorable, metric, 
00869           event));
00870         GATMetricEvent_Destroy(&event);
00871       }
00872       else
00873       {
00874         GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00875       }
00876       
00877       GATMetric_Destroy(&metric);
00878     }
00879   }
00880   else
00881   {
00882     GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00883   }
00884 
00885   return GAT_RETURN_STATUS();
00886 }
00887 
00888 /** pipe_adaptor_FireSeekEvent
00889  *
00890  *  The function pipe_adaptor_FireSeekEvent fires a 
00891  *  'pipe seek' event to all registered metric listeners.
00892  */
00893 static GATResult 
00894 pipe_adaptor_FireSeekEvent(GATMonitorable_Impl monitorable, 
00895   GATObject_const source, char const *name)
00896 {
00897   GATResult retval = GAT_INVALID_PARAMETER;
00898   if (NULL != monitorable && NULL != source && NULL != name)
00899   {
00900     GATMetric metric = NULL;
00901     
00902     retval = GATMetric_CreateMetric(&metric_data[METRIC_PIPE_SEEK], &metric);
00903     if (GAT_SUCCESS == retval)
00904     {
00905       GATMetricEvent event = GATMetricEvent_Create_EventLike(source, metric, 
00906         (void *)name, (GATuint32)(strlen(name) + 1));
00907         
00908       if (NULL != event)
00909       {
00910         retval = GATMonitorable_Impl_FireEvent(monitorable, metric, event);
00911       }
00912       else
00913       {
00914         retval = GAT_MEMORYFAILURE;
00915       }
00916       
00917       GATMetricEvent_Destroy(&event);
00918       GATMetric_Destroy(&metric);
00919     }
00920   }
00921   return retval;
00922 }
00923 
00924 /** pipe_adaptor_FireCloseEvent
00925  *
00926  *  The function pipe_adaptor_FireCloseEvent fires a 
00927  *  'pipe close' event to all registered metric listeners.
00928  */
00929 static GATResult 
00930 pipe_adaptor_FireCloseEvent(GATContext context, GATMonitorable_Impl monitorable, 
00931   GATObject_const source, char const *name)
00932 {
00933   GAT_USES_STATUS(context, "pipe_adaptor_FireCloseEvent");
00934   
00935   if (NULL != monitorable && NULL != source && NULL != name)
00936   {
00937     GATMetric metric = NULL;
00938     
00939     GAT_CREATE_STATUS(GATMetric_CreateMetric(&metric_data[METRIC_PIPE_CLOSE], 
00940       &metric));
00941     if (GAT_SUCCEEDED(GAT_CURRENT_STATUS()))
00942     {
00943       GATMetricEvent event = GATMetricEvent_Create_EventLike(source, metric, 
00944         (void *)name, (GATuint32)(strlen(name) + 1));
00945         
00946       if (NULL != event)
00947       {
00948         GAT_CREATE_STATUS(GATMonitorable_Impl_FireEvent(monitorable, metric, 
00949           event));
00950         GATMetricEvent_Destroy(&event);
00951       }
00952       else
00953       {
00954         GAT_CREATE_STATUS(GAT_MEMORYFAILURE);
00955       }
00956       
00957       GATMetric_Destroy(&metric);
00958     }
00959   }
00960   else
00961   {
00962     GAT_CREATE_STATUS(GAT_INVALID_PARAMETER);
00963   }
00964   
00965   return GAT_RETURN_STATUS();
00966 }