1414import traceback
1515from subprocess import Popen , PIPE
1616
17+ # --------------------------------------------------------------------------------------
18+ # Parallel processing-related functions
19+ # --------------------------------------------------------------------------------------
20+
21+
22+ @contextlib .contextmanager
23+ def tqdm_joblib (tqdm_object ):
24+ # Taken from https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution/41815007
25+ """Context manager to patch joblib to report into tqdm progress bar given as argument"""
26+
27+ def tqdm_print_progress (self ):
28+ if self .n_completed_tasks > tqdm_object .n :
29+ n_completed = self .n_completed_tasks - tqdm_object .n
30+ tqdm_object .update (n = n_completed )
31+
32+ original_print_progress = joblib .parallel .Parallel .print_progress
33+ joblib .parallel .Parallel .print_progress = tqdm_print_progress
34+
35+ try :
36+ yield tqdm_object
37+ finally :
38+ joblib .parallel .Parallel .print_progress = original_print_progress
39+ tqdm_object .close ()
40+
41+
42+ def run_algorithm (setup_dict ):
43+ """ Run an algorithm in parallel
44+
45+ Run an algorithm in parallel. This is supposed to be used in conjunction with
46+ joblib.Parallel to run different IM algorithms at the same time.
47+
48+
49+ Parameters
50+ ----------
51+ setup_dict : dict
52+ A dictionary containing the following keys:
53+ * "function" : a function that we have to run
54+ * "algo_name" : name of the algorithm represented by the function
55+ * "args" : arguments for that functions
56+ * "kwargs" : keyword arguments for that functions
57+
58+ Returns
59+ -------
60+ result_dict : dict
61+ A dictionary containing the following:
62+ * "results" : results generated by the function
63+ * "algo_name" : name of the algorithm represented by the function
64+ * "kwargs" : keyword argument from the function
65+
66+ Examples
67+ --------
68+ >>> setup_array = []
69+ ... setup_array.append(
70+ ... {
71+ ... "algo_name": "timlinucb",
72+ ... "function": timlinucb_parallel_t,
73+ ... "args": [DATASET, DATASET_FEATS, DATASET_TIMES, DATASET_NODES],
74+ ... "kwargs": {
75+ ... "num_seeds": NUM_SEEDS_TO_FIND,
76+ ... "num_repeats_oim": OPTIMAL_NUM_REPEATS_OIM_TLU,
77+ ... "num_repeats_oim_reward": OPTIMAL_NUM_REPEATS_REW_TLU,
78+ ... "sigma": OPTIMAL_SIGMA_TLU,
79+ ... "c": OPTIMAL_C_TLU,
80+ ... "epsilon": OPTIMAL_EPS_TLU,
81+ ... },
82+ ... }
83+ ... )
84+ ... results_array = joblib.Parallel(n_jobs=len(setup_array))(
85+ ... joblib.delayed(run_algorithm)(setup_dict) for setup_dict in setup_array
86+ ... )
87+
88+
89+ """
90+ try :
91+ result_dict = {
92+ "result" : setup_dict .get ("function" )(
93+ * setup_dict .get ("args" ), ** setup_dict .get ("kwargs" )
94+ ),
95+ "algo_name" : setup_dict .get ("algo_name" ),
96+ "kwargs" : setup_dict .get ("kwargs" ),
97+ }
98+ except Exception as e :
99+ print (e )
100+ print (setup_dict )
101+ traceback .print_exc ()
102+ return {}
103+ return result_dict
104+
105+
106+ def _run_timlinucb_parallel (setup_dict ):
107+ """ Run IMLinUCB in parallel
108+
109+ This is a helper function used by timlinucb_parallel_oim from timlinucb.py to
110+ run multiple IMLinUCB instances at the same time.
111+
112+ Parameters
113+ ----------
114+ setup_dict : dict
115+ A dictionary containing the following keys:
116+ * "function" : a function that we have to run
117+ * "time" : time t of the current OIM execution
118+ * "args" : arguments for that functions
119+ * "kwargs" : keyword arguments for that functions
120+
121+ Returns
122+ -------
123+ result_dict : dict
124+ A dictionary containing the following:
125+ * "results" : results generated by IMLinUCB
126+ * "time" : time t of the current OIM execution
127+
128+
129+ """
130+ result = setup_dict ["function" ](* setup_dict ["args" ], ** setup_dict ["kwargs" ])
131+ result ["time" ] = setup_dict ["time" ]
132+ return result
133+
134+
135+ # --------------------------------------------------------------------------------------
136+ # IC-related functions
137+ # --------------------------------------------------------------------------------------
138+
17139
18140def get_avg_reward (df , seeds , num_repeats ):
141+ """ Simulate the influence propagation using the IC model
142+
143+ Parameters
144+ ----------
145+ df : pandas.DataFrame
146+ The graph we run the IC on, in the form of a DataFrame. A row represents one
147+ edge in the graph, with columns being named "source", "target", "probab".
148+ "probab" column contains the activation probability.
149+ seeds : list, pandas.Series
150+ A list of the nodes to start propagating from.
151+ num_repeats : int
152+ Specifies how many times we want to simulate the propagation with IC.
153+
154+ Returns
155+ -------
156+ avg_reward : float
157+ Number showing how many nodes were influenced on average
158+ """
19159 reward = []
20160 for i in range (num_repeats ):
21161 reward .append (run_ic_nodes (df , seeds ).shape [0 ])
22162 return np .average (reward )
23163
24164
25165def get_stats_reward (df , seeds , num_repeats ):
166+ """ Simulate the influence propagation using the IC model
167+
168+ Parameters
169+ ----------
170+ df : pandas.DataFrame
171+ The graph we run the IC on, in the form of a DataFrame. A row represents one
172+ edge in the graph, with columns being named "source", "target", "probab".
173+ "probab" column contains the activation probability.
174+ seeds : list, pandas.Series
175+ A list of the nodes to start propagating from.
176+ num_repeats : int
177+ Specifies how many times we want to simulate the propagation with IC.
178+
179+ Returns
180+ -------
181+ avg_reward : float
182+ Number showing how many nodes were influenced on average
183+ std_reward : float
184+ Standard deviation of avg_reward
185+ """
26186 reward = []
27187 for i in range (num_repeats ):
28188 reward .append (run_ic_nodes (df , seeds ).shape [0 ])
29189 return np .average (reward ), np .std (reward )
30190
31191
32192def run_ic_eff (df_graph , seed_nodes ):
33- """ Runs independent cascade model.
34- Input: df_g -- a dataframe representing the graph (with the probabilities)
35- S -- initial set of vertices
36- tracking -- whether we want to check for active/observed nodes
37- Output: T -- resulted influenced set of vertices (including S)
193+ """ Simulate the influence propagation using the IC model
194+
195+ Parameters
196+ ----------
197+ df_graph : pandas.DataFrame
198+ The graph we run the IC on, in the form of a DataFrame. A row represents one
199+ edge in the graph, with columns being named "source", "target", "probab".
200+ "probab" column contains the activation probability.
201+ seed_nodes : list, pandas.Series
202+ A list of the nodes to start propagating from.
203+
204+ Returns
205+ -------
206+ results : tuple
207+ A tuple of the following numpy arrays
208+ - Affected nodes
209+ - Activated edges
210+ - Observed edges
211+
38212 """
39213 affected_nodes = deepcopy (seed_nodes ) # copy already selected nodes
40214 activated_edges = []
@@ -55,11 +229,21 @@ def run_ic_eff(df_graph, seed_nodes):
55229
56230
57231def run_ic_nodes (df_graph , seed_nodes ):
58- """ Runs independent cascade model.
59- Input: df_g -- a dataframe representing the graph (with the probabilities)
60- S -- initial set of vertices
61- tracking -- whether we want to check for active/observed nodes
62- Output: T -- resulted influenced set of vertices (including S)
232+ """ Simulate the influence propagation using the IC model
233+
234+ Parameters
235+ ----------
236+ df_graph : pandas.DataFrame
237+ The graph we run the IC on, in the form of a DataFrame. A row represents one
238+ edge in the graph, with columns being named "source", "target", "probab".
239+ "probab" column contains the activation probability.
240+ seed_nodes : list, pandas.Series
241+ A list of the nodes to start propagating from.
242+
243+ Returns
244+ -------
245+ affected_nodes : numpy.array
246+ Nodes influenced by propagating the seed nodes.
63247 """
64248 affected_nodes = deepcopy (seed_nodes ) # copy already selected nodes
65249 df_graph ["activated" ] = df_graph ["probab" ].apply (lambda x : random .random () <= x )
@@ -75,6 +259,11 @@ def run_ic_nodes(df_graph, seed_nodes):
75259 return np .array (affected_nodes )
76260
77261
262+ # --------------------------------------------------------------------------------------
263+ # TIM-related functions
264+ # --------------------------------------------------------------------------------------
265+
266+
78267def tim (
79268 df ,
80269 num_nodes ,
@@ -233,46 +422,3 @@ def tim_t_parallel(
233422 }
234423 )
235424 return pd .DataFrame (results )
236-
237-
238- @contextlib .contextmanager
239- def tqdm_joblib (tqdm_object ):
240- # Taken from https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution/41815007
241- """Context manager to patch joblib to report into tqdm progress bar given as argument"""
242-
243- def tqdm_print_progress (self ):
244- if self .n_completed_tasks > tqdm_object .n :
245- n_completed = self .n_completed_tasks - tqdm_object .n
246- tqdm_object .update (n = n_completed )
247-
248- original_print_progress = joblib .parallel .Parallel .print_progress
249- joblib .parallel .Parallel .print_progress = tqdm_print_progress
250-
251- try :
252- yield tqdm_object
253- finally :
254- joblib .parallel .Parallel .print_progress = original_print_progress
255- tqdm_object .close ()
256-
257-
258- def run_algorithm (setup_dict ):
259- try :
260- result_dict = {
261- "result" : setup_dict .get ("function" )(
262- * setup_dict .get ("args" ), ** setup_dict .get ("kwargs" )
263- ),
264- "algo_name" : setup_dict .get ("algo_name" ),
265- "kwargs" : setup_dict .get ("kwargs" ),
266- }
267- except Exception as e :
268- print (e )
269- print (setup_dict )
270- traceback .print_exc ()
271- return {}
272- return result_dict
273-
274-
275- def _run_timlinucb_parallel (setup_dict ):
276- result = setup_dict ["function" ](* setup_dict ["args" ], ** setup_dict ["kwargs" ])
277- result ["time" ] = setup_dict ["time" ]
278- return result
0 commit comments