1616#
1717
1818"""This module defines the basic MapToFields operation."""
19- import atexit
20- import importlib
2119import itertools
22- import os
23- import queue
2420import re
25- import sys
26- import threading
27- import uuid
2821from collections import abc
2922from collections .abc import Callable
3023from collections .abc import Collection
6053from apache_beam .yaml .yaml_errors import maybe_with_exception_handling_transform_fn
6154from apache_beam .yaml .yaml_provider import dicts_to_rows
6255
56+ # Import js2py package if it exists
57+ try :
58+ import js2py
59+ from js2py .base import JsObjectWrapper
60+ except ImportError :
61+ js2py = None
62+ JsObjectWrapper = object
63+
6364_str_expression_fields = {
6465 'AssignTimestamps' : 'timestamp' ,
6566 'Filter' : 'keep' ,
@@ -177,6 +178,20 @@ def _check_mapping_arguments(
177178 raise ValueError (f'{ transform_name } cannot specify "name" without "path"' )
178179
179180
181+ # js2py's JsObjectWrapper object has a self-referencing __dict__ property
182+ # that cannot be pickled without implementing the __getstate__ and
183+ # __setstate__ methods.
184+ class _CustomJsObjectWrapper (JsObjectWrapper ):
185+ def __init__ (self , js_obj ):
186+ super ().__init__ (js_obj .__dict__ ['_obj' ])
187+
188+ def __getstate__ (self ):
189+ return self .__dict__ .copy ()
190+
191+ def __setstate__ (self , state ):
192+ self .__dict__ .update (state )
193+
194+
180195# TODO(yaml) Improve type inferencing for JS UDF's
181196def py_value_to_js_dict (py_value ):
182197 if ((isinstance (py_value , tuple ) and hasattr (py_value , '_asdict' )) or
@@ -190,181 +205,85 @@ def py_value_to_js_dict(py_value):
190205 return py_value
191206
192207
193- class PythonMonkeyDispatcher :
194- """Dispatcher for executing JavaScript code using pythonmonkey.
195-
196- This class manages a worker thread to execute JavaScript, ensuring that
197- pythonmonkey is only imported and used within that thread. It also handles
198- process shutdown carefully to avoid segmentation faults known to occur
199- when pythonmonkey is present during standard Python interpreter finalization.
200- """
201- def __init__ (self ):
202- self ._req_queue = queue .Queue ()
203- self ._resp_events = {}
204- self ._resp_data = {}
205- self ._lock = threading .Lock ()
206- self ._thread = threading .Thread (target = self ._worker , daemon = True )
207- self ._started = False
208- # Register the stop method to be called on exit.
209- # atexit handlers are executed in LIFO order. By registering at import time,
210- # we ensure this handler runs last, allowing other cleanup handlers
211- # (registered later) to execute first.
212- atexit .register (self .stop )
213-
214- def start (self ):
215- with self ._lock :
216- if not self ._started :
217- self ._thread .start ()
218- self ._started = True
219-
220- def stop (self ):
221- # This method is called on process exit.
222- if not self ._started :
223- return
224- # Flush standard streams before forced exit to avoid data loss.
225- try :
226- sys .stdout .flush ()
227- sys .stderr .flush ()
228- except Exception :
229- pass
230- # Force an immediate exit to avoid a segmentation fault that occurs with
231- # pythonmonkey during standard interpreter finalization.
232- # Since this runs as one of the last atexit handlers (due to import-time
233- # registration), most other cleanup should have already completed.
234- os ._exit (0 )
235-
236- def _worker (self ):
237- try :
238- import pythonmonkey as pm
239- except ImportError :
240- pm = None
241-
242- self ._pm = pm
243- self ._cache = {}
244-
245- while True :
246- req = self ._req_queue .get ()
247- if req is None :
248- break
249-
250- req_id , type_str , payload = req
251- res = None
252- is_err = False
253- try :
254- if self ._pm is None :
255- raise ImportError (
256- "PythonMonkey not installed or failed to import in worker thread."
257- )
258-
259- if type_str == 'exec' :
260- source , row = payload
261- if source not in self ._cache :
262- self ._cache [source ] = self ._pm .eval (f"({ source } )" )
263- func = self ._cache [source ]
264- res = func (row )
265- except Exception as e :
266- res = e
267- is_err = True
268-
269- with self ._lock :
270- if req_id in self ._resp_events :
271- self ._resp_data [req_id ] = (is_err , res )
272- self ._resp_events [req_id ].set ()
273-
274- def eval_and_run (self , source , row ):
275- if not self ._started :
276- self .start ()
277-
278- req_id = str (uuid .uuid4 ())
279- event = threading .Event ()
280- with self ._lock :
281- self ._resp_events [req_id ] = event
282-
283- self ._req_queue .put ((req_id , 'exec' , (source , row )))
284- event .wait ()
285-
286- with self ._lock :
287- is_err , result = self ._resp_data .pop (req_id )
288- del self ._resp_events [req_id ]
289-
290- if is_err :
291- raise result
292- return result
293-
294-
295- _pythonmonkey_dispatcher = PythonMonkeyDispatcher ()
296-
297-
298- class JavaScriptCallable :
299- def __init__ (self , source , name = None ):
300- self ._source = source
301- self ._name = name
302-
303- def __call__ (self , row ):
304- # Check for pythonmonkey availability lazily (on first call)
305- if importlib .util .find_spec ("pythonmonkey" ) is None :
306- raise RuntimeError (
307- "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
308- "to use JavaScript mapping functions." )
309-
310- row_as_dict = py_value_to_js_dict (row )
311- try :
312- # If we have a name, it means we evaluated a file and need to call
313- # a specific function.
314- # Dispatcher expects a self-contained source/expression.
315- if self ._name :
316- # Wrap: (function() { <source>; return <name>; })()
317- effective_source = (
318- f"(function() {{ { self ._source } ; return { self ._name } ; }})()" )
319- else :
320- # Expression/Callable case: Wrap in parens to be safe
321- effective_source = f"({ self ._source } )"
322-
323- js_result = _pythonmonkey_dispatcher .eval_and_run (
324- effective_source , row_as_dict )
325-
326- except Exception as exn :
327- raise RuntimeError (
328- f"Error evaluating javascript expression: { exn } " ) from exn
329- return dicts_to_rows (_finalize_js_result (js_result ))
330-
331-
332- def _finalize_js_result (obj ):
333- """Coerce pythonmonkey objects to native Python objects (specifically
334- strings).
335- """
336- if isinstance (obj , str ):
337- return str (obj )
338- if isinstance (obj , list ):
339- return [_finalize_js_result (x ) for x in obj ]
340- if isinstance (obj , dict ):
341- return {k : _finalize_js_result (v ) for k , v in obj .items ()}
342- return obj
343-
344-
208+ # TODO(yaml) Consider adding optional language version parameter to support
209+ # ECMAScript 5 and 6
345210def _expand_javascript_mapping_func (
346211 original_fields , expression = None , callable = None , path = None , name = None ):
347212
348- if importlib .util .find_spec ("pythonmonkey" ) is None :
213+ # Check for installed js2py package
214+ if js2py is None :
349215 raise ValueError (
350- "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
351- "to use JavaScript mapping functions." )
216+ "Javascript mapping functions are not supported on"
217+ " Python 3.12 or later." )
218+
219+ # import remaining js2py objects
220+ from js2py import base
221+ from js2py .constructors import jsdate
222+ from js2py .internals import simplex
223+
224+ js_array_type = (
225+ base .PyJsArray ,
226+ base .PyJsArrayBuffer ,
227+ base .PyJsInt8Array ,
228+ base .PyJsUint8Array ,
229+ base .PyJsUint8ClampedArray ,
230+ base .PyJsInt16Array ,
231+ base .PyJsUint16Array ,
232+ base .PyJsInt32Array ,
233+ base .PyJsUint32Array ,
234+ base .PyJsFloat32Array ,
235+ base .PyJsFloat64Array )
236+
237+ def _js_object_to_py_object (obj ):
238+ if isinstance (obj , (base .PyJsNumber , base .PyJsString , base .PyJsBoolean )):
239+ return base .to_python (obj )
240+ elif isinstance (obj , js_array_type ):
241+ return [_js_object_to_py_object (value ) for value in obj .to_list ()]
242+ elif isinstance (obj , jsdate .PyJsDate ):
243+ return obj .to_utc_dt ()
244+ elif isinstance (obj , (base .PyJsNull , base .PyJsUndefined )):
245+ return None
246+ elif isinstance (obj , base .PyJsError ):
247+ raise RuntimeError (obj ['message' ])
248+ elif isinstance (obj , base .PyJsObject ):
249+ return {
250+ key : _js_object_to_py_object (value ['value' ])
251+ for (key , value ) in obj .own .items ()
252+ }
253+ elif isinstance (obj , base .JsObjectWrapper ):
254+ return _js_object_to_py_object (obj ._obj )
255+
256+ return obj
352257
353258 if expression :
354259 source = '\n ' .join (['function(__row__) {' ] + [
355260 f' { name } = __row__.{ name } '
356261 for name in original_fields if name in expression
357262 ] + [' return (' + expression + ')' ] + ['}' ])
358- return JavaScriptCallable ( source )
263+ js_func = _CustomJsObjectWrapper ( js2py . eval_js ( source ) )
359264
360265 elif callable :
361- return JavaScriptCallable ( callable )
266+ js_func = _CustomJsObjectWrapper ( js2py . eval_js ( callable ) )
362267
363268 else :
364269 if not path .endswith ('.js' ):
365270 raise ValueError (f'File "{ path } " is not a valid .js file.' )
366271 udf_code = FileSystems .open (path ).read ().decode ()
367- return JavaScriptCallable (udf_code , name = name )
272+ js = js2py .EvalJs ()
273+ js .eval (udf_code )
274+ js_func = _CustomJsObjectWrapper (getattr (js , name ))
275+
276+ def js_wrapper (row ):
277+ row_as_dict = py_value_to_js_dict (row )
278+ try :
279+ js_result = js_func (row_as_dict )
280+ except simplex .JsException as exn :
281+ raise RuntimeError (
282+ f"Error evaluating javascript expression: "
283+ f"{ exn .mes ['message' ]} " ) from exn
284+ return dicts_to_rows (_js_object_to_py_object (js_result ))
285+
286+ return js_wrapper
368287
369288
370289def _expand_python_mapping_func (
0 commit comments