22import duckdb
33from utils .cache import get_cached_topics , save_cached_topics
44import os
5+ import psutil
56
67class TopicService :
78 # Define the allowed terms for caching
@@ -19,8 +20,19 @@ def __init__(self):
1920 if os .path .exists (db_path ):
2021 # Connect in read-only mode to avoid locking issues
2122 self .con = duckdb .connect (database = db_path , read_only = True )
22- self .con .execute ("SET threads TO 2;" )
23- self .con .execute ("SET memory_limit TO '0.5GB';" )
23+
24+ # Set conservative memory limits based on available system memory
25+ available_memory = psutil .virtual_memory ().available
26+ memory_limit = min (available_memory * 0.3 , 0.5 * 1024 * 1024 * 1024 ) # Use 30% of available memory, max 0.5GB
27+ self .con .execute (f"SET memory_limit TO '{ int (memory_limit )} B'" )
28+
29+ # Set conservative thread count
30+ cpu_count = psutil .cpu_count (logical = False ) or 1
31+ thread_count = max (1 , min (cpu_count , 2 )) # Use at most 2 threads
32+ self .con .execute (f"SET threads TO { thread_count } " )
33+
34+ # Enable streaming for large queries
35+ self .con .execute ("SET enable_streaming TO true" )
2436 else :
2537 raise FileNotFoundError (
2638 f"Database not found at { db_path } . Please ensure the database file exists before running the application."
@@ -41,31 +53,28 @@ def process_topics(self, search_term: str):
4153 "cached" : True
4254 }
4355
44- # Get data from normalized tables in DuckDB
56+ # Use a more efficient query that filters early
4557 query = """
46- SELECT r.nameWithOwner, t.topic
47- FROM repos r
48- JOIN repo_topics t ON r.nameWithOwner = t.repo
58+ WITH filtered_repos AS (
59+ SELECT DISTINCT r.nameWithOwner
60+ FROM repos r
61+ JOIN repo_topics t ON r.nameWithOwner = t.repo
62+ WHERE LOWER(t.topic) = ?
63+ )
64+ SELECT t.topic, COUNT(*) as count
65+ FROM filtered_repos fr
66+ JOIN repo_topics t ON fr.nameWithOwner = t.repo
67+ WHERE LOWER(t.topic) != ?
68+ GROUP BY t.topic
69+ HAVING COUNT(*) > 2
70+ ORDER BY count DESC
4971 """
50- df = self .con .execute (query ).fetchdf ()
51-
52- # Group topics by repo into a list
53- grouped = df .groupby ("nameWithOwner" )["topic" ].apply (list ).reset_index ()
54- grouped .columns = ["nameWithOwner" , "topics" ]
55-
56- # Filter repos based on search term in topics
57- filtered_df = grouped [grouped ["topics" ].apply (lambda x : search_term in [t .lower () for t in x ])]
58-
59- # Count all co-occurring topics
60- all_topics = [topic for topics in filtered_df ["topics" ] for topic in topics ]
61- topic_counts = Counter ([t .lower () for t in all_topics ])
62-
63- # Remove the searched topic itself
64- topic_counts .pop (search_term , None )
65-
66- # Format results and sort, only including topics with count > 2
67- topics = [{"name" : name , "count" : count } for name , count in topic_counts .items () if count > 2 ]
68- topics = sorted (topics , key = lambda x : x ["count" ], reverse = True )
72+
73+ # Execute query with streaming
74+ result = self .con .execute (query , [search_term , search_term ]).fetchall ()
75+
76+ # Convert results to the expected format
77+ topics = [{"name" : name .lower (), "count" : count } for name , count in result ]
6978
7079 # Only cache results for allowed terms
7180 if search_term in self .CACHEABLE_TERMS :
@@ -83,4 +92,8 @@ def process_topics(self, search_term: str):
8392 "success" : False ,
8493 "error" : str (e ),
8594 "message" : "An error occurred while processing the request"
86- }
95+ }
96+ finally :
97+ # Force garbage collection after each request
98+ import gc
99+ gc .collect ()
0 commit comments