diff --git a/source/dcautil/Makefile.am b/source/dcautil/Makefile.am index 333bcf2b..c6170fd5 100644 --- a/source/dcautil/Makefile.am +++ b/source/dcautil/Makefile.am @@ -34,6 +34,7 @@ libdcautil_la_CFLAGS += -fPIC -I${PKG_CONFIG_SYSROOT_DIR}$(includedir)/dbus-1.0 -I${top_srcdir}/source/ccspinterface \ -I${top_srcdir}/source/utils \ -I${top_srcdir}/source/bulkdata \ + -I${top_srcdir}/source/protocol/http \ -I${PKG_CONFIG_SYSROOT_DIR}$(includedir)/ccsp libdcautil_la_DEPENDENCIES = ${top_builddir}/source/ccspinterface/libccspinterface.la ${top_builddir}/source/utils/libt2utils.la diff --git a/source/dcautil/dca.c b/source/dcautil/dca.c index c31df381..b647df24 100644 --- a/source/dcautil/dca.c +++ b/source/dcautil/dca.c @@ -47,6 +47,7 @@ #include "t2log_wrapper.h" #include "t2common.h" #include "busInterface.h" +#include "multicurlinterface.h" static bool check_rotated_logs = false; // using this variable to indicate whether it needs to check the rotated logs or not . Initialising it with false. @@ -59,6 +60,12 @@ static bool firstreport_after_bootup = false; // the rotated logs check should r #define BUFFER_SIZE 4096 // TODO fine tune this value based on the size of the data #define LARGE_FILE_THRESHOLD 1000000 // 1MB #define MAX_TIMESTAMP_LENGTH 24 + +/* + * CPU contention avoidance window: + * prevent overlap between top/process sampling and telemetry curl work. + */ +#define DCA_SAMPLING_CURL_WAIT_TIMEOUT_MS 2000 /** * @addtogroup DCA_TYPES * @{ @@ -256,6 +263,7 @@ static time_t extractUnixTimestamp(const char* line_start, size_t line_length) int processTopPattern(char* profileName, Vector* topMarkerList, int profileExecCounter) { T2Debug("%s ++in\n", __FUNCTION__); + bool isSamplingActive = false; if(profileName == NULL || topMarkerList == NULL) { T2Error("Invalid arguments for %s\n", __FUNCTION__); @@ -314,6 +322,18 @@ int processTopPattern(char* profileName, Vector* topMarkerList, int profileExec for (; var < vCount; ++var) // Loop of marker list starts here { + if(!isSamplingActive) + { + /* + * Enter sampling window so new curl work is deferred while + * top/process markers are being collected. + */ + if(http_pool_begin_sampling_window(DCA_SAMPLING_CURL_WAIT_TIMEOUT_MS) == T2ERROR_SUCCESS) + { + isSamplingActive = true; + } + } + TopMarker* topMarkerObj = (TopMarker*) Vector_At(topMarkerList, var); if (!topMarkerObj || !topMarkerObj->logFile || !topMarkerObj->searchString || !topMarkerObj->markerName) { @@ -370,6 +390,13 @@ int processTopPattern(char* profileName, Vector* topMarkerList, int profileExec } + + if(isSamplingActive) + { + /* Leave sampling window and allow deferred curl work to proceed. */ + http_pool_end_sampling_window(); + } + #if !defined(ENABLE_RDKC_SUPPORT) && !defined(ENABLE_RDKB_SUPPORT) removeTopOutput(filename); #endif diff --git a/source/protocol/http/multicurlinterface.c b/source/protocol/http/multicurlinterface.c index fe49ecff..9ab3debe 100644 --- a/source/protocol/http/multicurlinterface.c +++ b/source/protocol/http/multicurlinterface.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include "multicurlinterface.h" @@ -72,8 +73,11 @@ #define HTTP_RESPONSE_FILE "/tmp/httpOutput.txt" #define POOL_ACQUIRE_TIMEOUT_SEC 35 #define POOL_ACQUIRE_RETRY_MS 100 +/* Maximum wait for in-flight requests to drain when DCA sampling starts. */ +#define SAMPLE_WINDOW_DRAIN_RETRY_MS 100 static bool pool_initialized = false; static bool pool_shutting_down = false; +static unsigned int sampling_window_refcount = 0; // pool_mutex protects pool state and synchronizes access to pool entries static pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -95,6 +99,95 @@ static struct curl_slist *post_headers = NULL; // Shared POST headers static int pool_size = 0; // Number of pool entries static unsigned int active_requests = 0; // Number of in-flight curl_easy_perform() calls +/* + * CPU contention avoidance window: + * while DCA top sampling is active, defer new curl acquisitions to avoid + * running two CPU-intensive telemetry paths in parallel. + */ +T2ERROR http_pool_begin_sampling_window(unsigned int timeout_ms) +{ + struct timespec start_time, current_time; + bool timeout_reached = false; + + if (clock_gettime(CLOCK_MONOTONIC, &start_time) != 0) + { + T2Error("clock_gettime failed for start_time in %s: %s\n", __FUNCTION__, strerror(errno)); + return T2ERROR_FAILURE; + } + + pthread_mutex_lock(&pool_mutex); + sampling_window_refcount++; + pthread_mutex_unlock(&pool_mutex); + + while (1) + { + unsigned int pending = 0; + + pthread_mutex_lock(&pool_mutex); + pending = active_requests; + pthread_mutex_unlock(&pool_mutex); + + if (pending == 0) + { + break; + } + + if (timeout_ms > 0) + { + unsigned int elapsed_ms = 0; + + if (clock_gettime(CLOCK_MONOTONIC, ¤t_time) != 0) + { + T2Error("clock_gettime failed for current_time in %s: %s\n", __FUNCTION__, strerror(errno)); + break; + } + + long sec = current_time.tv_sec - start_time.tv_sec; + long nsec = current_time.tv_nsec - start_time.tv_nsec; + if (nsec < 0) + { + sec--; + nsec += 1000000000L; + } + elapsed_ms = (unsigned int)(sec * 1000L + (nsec / 1000000L)); + + if (elapsed_ms >= timeout_ms) + { + timeout_reached = true; + break; + } + } + + usleep(SAMPLE_WINDOW_DRAIN_RETRY_MS * 1000); + } + + if (timeout_reached) + { + unsigned int pending_requests = 0; + pthread_mutex_lock(&pool_mutex); + pending_requests = active_requests; + pthread_mutex_unlock(&pool_mutex); + T2Warning("Sampling window started with %u active request(s) still in flight after %u ms\n", + pending_requests, timeout_ms); + } + + return T2ERROR_SUCCESS; +} + +void http_pool_end_sampling_window(void) +{ + pthread_mutex_lock(&pool_mutex); + if (sampling_window_refcount > 0) + { + sampling_window_refcount--; + } + else + { + T2Warning("http_pool_end_sampling_window called with refcount already 0\n"); + } + pthread_mutex_unlock(&pool_mutex); +} + #ifdef LIBRDKCERTSEL_BUILD #if defined(ENABLE_RED_RECOVERY_SUPPORT) bool isStateRedEnabled(void) @@ -413,6 +506,26 @@ static T2ERROR acquire_pool_handle(CURL **easy, int *idx) return T2ERROR_FAILURE; } + if (sampling_window_refcount > 0) + { + pthread_mutex_unlock(&pool_mutex); + + if (clock_gettime(CLOCK_MONOTONIC, ¤t_time) != 0) + { + T2Error("clock_gettime failed for current_time: %s\n", strerror(errno)); + return T2ERROR_FAILURE; + } + + if ((current_time.tv_sec - start_time.tv_sec) >= POOL_ACQUIRE_TIMEOUT_SEC) + { + T2Error("Timeout waiting for sampling window to complete, treating as upload failure\n"); + return T2ERROR_FAILURE; + } + + usleep(POOL_ACQUIRE_RETRY_MS * 1000); + continue; + } + // Try to find an available handle for(int i = 0; i < pool_size; i++) { @@ -462,8 +575,15 @@ static void release_pool_handle(int idx) if (idx >= 0 && idx < pool_size) { pool_entries[idx].handle_available = true; - active_requests--; - T2Info("Released curl handle = %d, active_requests = %d\n", idx, active_requests); + if (active_requests > 0) + { + active_requests--; + } + else + { + T2Warning("release_pool_handle called with active_requests already 0\n"); + } + T2Info("Released curl handle = %d, active_requests = %u\n", idx, active_requests); } else { diff --git a/source/protocol/http/multicurlinterface.h b/source/protocol/http/multicurlinterface.h index 2a1e506c..93e01bc9 100644 --- a/source/protocol/http/multicurlinterface.h +++ b/source/protocol/http/multicurlinterface.h @@ -43,6 +43,8 @@ T2ERROR init_connection_pool(); // New dedicated APIs for better separation of concerns T2ERROR http_pool_get(const char *url, char **response_data, bool enable_file_output); T2ERROR http_pool_post(const char *url, const char *payload); +T2ERROR http_pool_begin_sampling_window(unsigned int timeout_ms); +void http_pool_end_sampling_window(void); T2ERROR http_pool_cleanup(void);