Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/dcautil/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ libdcautil_la_CFLAGS += -fPIC -I${PKG_CONFIG_SYSROOT_DIR}$(includedir)/dbus-1.0
-I${top_srcdir}/source/ccspinterface \
-I${top_srcdir}/source/utils \
-I${top_srcdir}/source/bulkdata \
-I${top_srcdir}/source/protocol/http \
-I${PKG_CONFIG_SYSROOT_DIR}$(includedir)/ccsp

Comment on lines 35 to 39
libdcautil_la_DEPENDENCIES = ${top_builddir}/source/ccspinterface/libccspinterface.la ${top_builddir}/source/utils/libt2utils.la
27 changes: 27 additions & 0 deletions source/dcautil/dca.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "t2log_wrapper.h"
#include "t2common.h"
#include "busInterface.h"
#include "multicurlinterface.h"


static bool check_rotated_logs = false; // using this variable to indicate whether it needs to check the rotated logs or not . Initialising it with false.
Expand All @@ -59,6 +60,12 @@ static bool firstreport_after_bootup = false; // the rotated logs check should r
#define BUFFER_SIZE 4096 // TODO fine tune this value based on the size of the data
#define LARGE_FILE_THRESHOLD 1000000 // 1MB
#define MAX_TIMESTAMP_LENGTH 24

/*
* CPU contention avoidance window:
* prevent overlap between top/process sampling and telemetry curl work.
*/
#define DCA_SAMPLING_CURL_WAIT_TIMEOUT_MS 2000
/**
* @addtogroup DCA_TYPES
* @{
Expand Down Expand Up @@ -256,6 +263,7 @@ static time_t extractUnixTimestamp(const char* line_start, size_t line_length)
int processTopPattern(char* profileName, Vector* topMarkerList, int profileExecCounter)
{
T2Debug("%s ++in\n", __FUNCTION__);
bool isSamplingActive = false;
if(profileName == NULL || topMarkerList == NULL)
{
T2Error("Invalid arguments for %s\n", __FUNCTION__);
Comment on lines 265 to 269
Expand Down Expand Up @@ -314,6 +322,18 @@ int processTopPattern(char* profileName, Vector* topMarkerList, int profileExec

for (; var < vCount; ++var) // Loop of marker list starts here
{
if(!isSamplingActive)
{
/*
* Enter sampling window so new curl work is deferred while
* top/process markers are being collected.
*/
if(http_pool_begin_sampling_window(DCA_SAMPLING_CURL_WAIT_TIMEOUT_MS) == T2ERROR_SUCCESS)
{
isSamplingActive = true;
}
}
Comment on lines 323 to +335
Comment on lines 323 to +335

TopMarker* topMarkerObj = (TopMarker*) Vector_At(topMarkerList, var);
if (!topMarkerObj || !topMarkerObj->logFile || !topMarkerObj->searchString || !topMarkerObj->markerName)
{
Expand Down Expand Up @@ -370,6 +390,13 @@ int processTopPattern(char* profileName, Vector* topMarkerList, int profileExec

}


if(isSamplingActive)
{
/* Leave sampling window and allow deferred curl work to proceed. */
http_pool_end_sampling_window();
}

#if !defined(ENABLE_RDKC_SUPPORT) && !defined(ENABLE_RDKB_SUPPORT)
removeTopOutput(filename);
#endif
Expand Down
124 changes: 122 additions & 2 deletions source/protocol/http/multicurlinterface.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <sys/stat.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <openssl/err.h>
#include <openssl/crypto.h>
#include "multicurlinterface.h"
Expand Down Expand Up @@ -72,8 +73,11 @@
#define HTTP_RESPONSE_FILE "/tmp/httpOutput.txt"
#define POOL_ACQUIRE_TIMEOUT_SEC 35
#define POOL_ACQUIRE_RETRY_MS 100
/* Maximum wait for in-flight requests to drain when DCA sampling starts. */
#define SAMPLE_WINDOW_DRAIN_RETRY_MS 100
Comment on lines +76 to +77
static bool pool_initialized = false;
static bool pool_shutting_down = false;
static unsigned int sampling_window_refcount = 0;

// pool_mutex protects pool state and synchronizes access to pool entries
static pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER;
Expand All @@ -95,6 +99,95 @@ static struct curl_slist *post_headers = NULL; // Shared POST headers
static int pool_size = 0; // Number of pool entries
static unsigned int active_requests = 0; // Number of in-flight curl_easy_perform() calls

/*
* CPU contention avoidance window:
* while DCA top sampling is active, defer new curl acquisitions to avoid
* running two CPU-intensive telemetry paths in parallel.
*/
T2ERROR http_pool_begin_sampling_window(unsigned int timeout_ms)
{
struct timespec start_time, current_time;
bool timeout_reached = false;

Comment on lines +107 to +111
if (clock_gettime(CLOCK_MONOTONIC, &start_time) != 0)
{
T2Error("clock_gettime failed for start_time in %s: %s\n", __FUNCTION__, strerror(errno));
return T2ERROR_FAILURE;
}

pthread_mutex_lock(&pool_mutex);
sampling_window_refcount++;
pthread_mutex_unlock(&pool_mutex);

Comment on lines +107 to +121
while (1)
{
unsigned int pending = 0;

pthread_mutex_lock(&pool_mutex);
pending = active_requests;
pthread_mutex_unlock(&pool_mutex);

if (pending == 0)
{
break;
}

if (timeout_ms > 0)
{
unsigned int elapsed_ms = 0;

if (clock_gettime(CLOCK_MONOTONIC, &current_time) != 0)
{
T2Error("clock_gettime failed for current_time in %s: %s\n", __FUNCTION__, strerror(errno));
break;
}

long sec = current_time.tv_sec - start_time.tv_sec;
long nsec = current_time.tv_nsec - start_time.tv_nsec;
if (nsec < 0)
{
sec--;
nsec += 1000000000L;
}
elapsed_ms = (unsigned int)(sec * 1000L + (nsec / 1000000L));

if (elapsed_ms >= timeout_ms)
{
timeout_reached = true;
break;
}
}

usleep(SAMPLE_WINDOW_DRAIN_RETRY_MS * 1000);
}

if (timeout_reached)
{
unsigned int pending_requests = 0;
pthread_mutex_lock(&pool_mutex);
pending_requests = active_requests;
pthread_mutex_unlock(&pool_mutex);
T2Warning("Sampling window started with %u active request(s) still in flight after %u ms\n",
pending_requests, timeout_ms);
}

return T2ERROR_SUCCESS;
}

void http_pool_end_sampling_window(void)
{
pthread_mutex_lock(&pool_mutex);
if (sampling_window_refcount > 0)
{
sampling_window_refcount--;
}
else
{
T2Warning("http_pool_end_sampling_window called with refcount already 0\n");
}
pthread_mutex_unlock(&pool_mutex);
}

#ifdef LIBRDKCERTSEL_BUILD
#if defined(ENABLE_RED_RECOVERY_SUPPORT)
bool isStateRedEnabled(void)
Expand Down Expand Up @@ -413,6 +506,26 @@ static T2ERROR acquire_pool_handle(CURL **easy, int *idx)
return T2ERROR_FAILURE;
}

if (sampling_window_refcount > 0)
{
pthread_mutex_unlock(&pool_mutex);

if (clock_gettime(CLOCK_MONOTONIC, &current_time) != 0)
{
T2Error("clock_gettime failed for current_time: %s\n", strerror(errno));
return T2ERROR_FAILURE;
}

if ((current_time.tv_sec - start_time.tv_sec) >= POOL_ACQUIRE_TIMEOUT_SEC)
{
T2Error("Timeout waiting for sampling window to complete, treating as upload failure\n");
return T2ERROR_FAILURE;
}

usleep(POOL_ACQUIRE_RETRY_MS * 1000);
continue;
}
Comment on lines +509 to +527

// Try to find an available handle
for(int i = 0; i < pool_size; i++)
{
Expand Down Expand Up @@ -462,8 +575,15 @@ static void release_pool_handle(int idx)
if (idx >= 0 && idx < pool_size)
{
pool_entries[idx].handle_available = true;
active_requests--;
T2Info("Released curl handle = %d, active_requests = %d\n", idx, active_requests);
if (active_requests > 0)
{
active_requests--;
}
else
{
T2Warning("release_pool_handle called with active_requests already 0\n");
}
T2Info("Released curl handle = %d, active_requests = %u\n", idx, active_requests);
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions source/protocol/http/multicurlinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ T2ERROR init_connection_pool();
// New dedicated APIs for better separation of concerns
T2ERROR http_pool_get(const char *url, char **response_data, bool enable_file_output);
T2ERROR http_pool_post(const char *url, const char *payload);
T2ERROR http_pool_begin_sampling_window(unsigned int timeout_ms);
void http_pool_end_sampling_window(void);

T2ERROR http_pool_cleanup(void);

Expand Down
Loading