diff --git a/src/api/dcps/ccpp/code/ccpp_WaitSet.cpp b/src/api/dcps/ccpp/code/ccpp_WaitSet.cpp index 579d075..8c5a213 100644 --- a/src/api/dcps/ccpp/code/ccpp_WaitSet.cpp +++ b/src/api/dcps/ccpp/code/ccpp_WaitSet.cpp @@ -175,3 +175,15 @@ DDS::ReturnCode_t DDS::WaitSet::get_conditions ( return result; } +#if defined(WIN32) +HANDLE +#elif defined(_POSIX_C_SOURCE) +int +#endif +DDS::WaitSet::get_os_waitable_handle(){ + return gapi_waitSet_get_os_waitable_handle(_gapi_self); +} + +os_uint64 DDS::WaitSet::clear_os_waitable_handle_events(){ + return gapi_waitSet_clear_os_waitable_handle_events(_gapi_self); +} diff --git a/src/api/dcps/ccpp/include/ccpp_WaitSet.h b/src/api/dcps/ccpp/include/ccpp_WaitSet.h index 0d42e37..165fed4 100644 --- a/src/api/dcps/ccpp/include/ccpp_WaitSet.h +++ b/src/api/dcps/ccpp/include/ccpp_WaitSet.h @@ -52,6 +52,14 @@ namespace DDS ::DDS::ConditionSeq & attached_conditions ) THROW_ORB_EXCEPTIONS; + #if defined(WIN32) + HANDLE + #elif defined(_POSIX_C_SOURCE) + int + #endif + get_os_waitable_handle(); + + os_uint64 clear_os_waitable_handle_events(); }; } diff --git a/src/api/dcps/gapi/code/gapi_waitSet.c b/src/api/dcps/gapi/code/gapi_waitSet.c index 95f331b..f0ddf10 100644 --- a/src/api/dcps/gapi/code/gapi_waitSet.c +++ b/src/api/dcps/gapi/code/gapi_waitSet.c @@ -25,6 +25,11 @@ #include "v_query.h" #include "os_heap.h" #include "os_report.h" +#include "os_stdlib.h" +#include "os_socket.h" +#if defined(linux) +#include +#endif #define _ConditionEntry(o) \ ((_ConditionEntry)(o)) @@ -108,6 +113,14 @@ _WaitSetFree( } os_condDestroy(&waitset->cv); os_mutexDestroy(&waitset->mutex); +#if defined(WIN32) + CloseHandle(waitset->os_waitable_handle); +#elif defined(_POSIX_C_SOURCE) + close(waitset->os_waitable_handle); + #if !defined(linux) + close(waitset->os_waitable_handle_wpipe); + #endif +#endif return TRUE; } @@ -125,6 +138,14 @@ _WaitSetNew(void) newWaitSet->conditions = NULL; newWaitSet->length = 0; newWaitSet->domains = NULL; +#if defined(WIN32) + newWaitSet->os_waitable_handle = INVALID_HANDLE_VALUE; +#elif defined(_POSIX_C_SOURCE) + newWaitSet->os_waitable_handle = -1; + #if !defined(linux) + newWaitSet->os_waitable_handle_wpipe = -1; + #endif +#endif } if (newWaitSet) { @@ -158,6 +179,56 @@ _WaitSetNew(void) newWaitSet = NULL; } } + + if (newWaitSet) { +#if defined(WIN32) + newWaitSet->os_waitable_handle = CreateEvent(NULL, true, false, NULL); /*untested reasonable defaults?*/ + osResult = newWaitSet->os_waitable_handle == NULL ? GetLastError() : os_resultSuccess; /*TODO: demux GetLastError*/ +#elif defined(_POSIX_C_SOURCE) + #if defined(linux) + newWaitSet->os_waitable_handle = eventfd(0, EFD_NONBLOCK); + if(newWaitSet->os_waitable_handle == -1){ + switch(errno){ + case EINVAL:{ + osResult = os_resultInvalid; + break; + } + case EMFILE: + case ENFILE: + case ENODEV: + case ENOMEM:{ + osResult = os_resultFail; + break; + } + default:{ + osResult = os_resultFail; + break; + } + } + }else{ + osResult = os_resultSuccess; + } + #else + int pipes[2]; + osReslt = pipe(pipes); + newWaitSet->os_waitable_handle = pipes[0]; + newWaitSet->os_waitable_handle_wpipe = pipes[1]; + #endif +#endif + if (osResult == os_resultSuccess) { +#if defined(_POSIX_C_SOURCE) && !defined(linux) + osResult = os_sockSetNonBlocking(newWaitSet->os_waitable_handle, OS_TRUE); + if (osResult != os_resultSuccess) { + gapi_free(newWaitSet); + newWaitSet = NULL; + } +#endif + } else { + gapi_free(newWaitSet); + newWaitSet = NULL; + } + } + /* Add the new waitset to the DomainParticipantFactory, so we can cleanup * when the process terminates */ @@ -188,6 +259,22 @@ _WaitSetNotify( c_iterWalk(_this->domains, notifyDomain, (c_voidp)cond); } } + +#if defined(WIN32) + SetEvent(newWaitSet->os_waitable_handle); +#elif defined(_POSIX_C_SOURCE) + { + #if defined(linux) + os_uint64 c; + c = 1; + os_write(_this->os_waitable_handle, &c, sizeof(c)); + #else + os_char c; + c = 1; + os_write(_this->os_waitable_handle_wpipe, &c, sizeof(c)); + #endif + } +#endif } gapi_waitSet @@ -864,3 +951,46 @@ getConditionDomainId( return result; } +#if defined(WIN32) +HANDLE +#elif defined(_POSIX_C_SOURCE) +int +#endif +gapi_waitSet_get_os_waitable_handle(gapi_waitSet _this){ + gapi_returnCode_t result; + _WaitSet waitset; + waitset = gapi_waitSetClaim(_this, &result); + return waitset->os_waitable_handle; +} + +os_uint64 gapi_waitSet_clear_os_waitable_handle_events(gapi_waitSet _this){ + gapi_returnCode_t result; + _WaitSet waitset; + os_uint64 c = 0; + + waitset = gapi_waitSetClaim(_this, &result); + + if (waitset == NULL) { + return 0; + } +#if defined(WIN32) + ResetEvent(waitset->os_waitable_handle); +#elif defined(_POSIX_C_SOURCE) + #if defined(linux) + read(waitset->os_waitable_handle, &c, sizeof(c)); + #else + char buf[4096]; + for(;;){ + int res = read(waitset->os_waitable_handle, buf, sizeof(buf); + if(res > 0){ + c += res; + }else if(res == 0){ + break; + }else if(res == -1 && !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)){ + break; + } + } + #endif +#endif +return c; +} diff --git a/src/api/dcps/gapi/code/gapi_waitSet.h b/src/api/dcps/gapi/code/gapi_waitSet.h index b4f6ab7..adf4b38 100644 --- a/src/api/dcps/gapi/code/gapi_waitSet.h +++ b/src/api/dcps/gapi/code/gapi_waitSet.h @@ -42,6 +42,15 @@ C_STRUCT(_WaitSet) { c_voidp conditions; c_long length; c_iter domains; + +#if defined(WIN32) + HANDLE os_waitable_handle; +#elif defined(_POSIX_C_SOURCE) + int os_waitable_handle; + #if !defined(linux) + int os_waitable_handle_wpipe; + #endif +#endif }; _WaitSet diff --git a/src/api/dcps/gapi/include/gapi.h b/src/api/dcps/gapi/include/gapi.h index b692126..ed6bdf6 100644 --- a/src/api/dcps/gapi/include/gapi.h +++ b/src/api/dcps/gapi/include/gapi.h @@ -973,6 +973,16 @@ OS_API gapi_waitSet gapi_waitSet__alloc ( void); +OS_API +#if defined(WIN32) +HANDLE +#elif defined(_POSIX_C_SOURCE) +int +#endif +gapi_waitSet_get_os_waitable_handle(gapi_waitSet _this); + +OS_API os_uint64 gapi_waitSet_clear_os_waitable_handle_events(gapi_waitSet _this); + /* * interface GuardCondition : Condition */