11from __future__ import annotations
22
33from datetime import datetime
4- from typing import Any , Dict , Generator , Iterable , List , Optional , Set
4+ from typing import Any , Dict , Generator , Iterable , List , Optional
55
66from pydantic .v1 import Field , ValidationError , parse_obj_as
77
2323 Term ,
2424 Terms ,
2525)
26- from pyatlan .utils import deep_get
2726
2827BY_TIMESTAMP = [SortItem ("timestamp" , order = SortOrder .ASCENDING )]
2928
@@ -377,6 +376,7 @@ def __init__(
377376 log_entries : List [SearchLogEntry ],
378377 aggregations : Dict [str , Aggregation ],
379378 bulk : bool = False ,
379+ processed_log_entries_count : int = 0 ,
380380 ):
381381 self ._client = client
382382 self ._endpoint = SEARCH_LOG
@@ -390,7 +390,8 @@ def __init__(
390390 self ._bulk = bulk
391391 self ._first_record_creation_time = - 2
392392 self ._last_record_creation_time = - 2
393- self ._processed_log_entries : Set [str ] = set ()
393+ self ._duplicate_timestamp_page_count : int = 0
394+ self ._processed_log_entries_count : int = processed_log_entries_count
394395
395396 @property
396397 def count (self ) -> int :
@@ -404,65 +405,15 @@ def current_page(self) -> List[SearchLogEntry]:
404405 """
405406 return self ._log_entries
406407
407- def _get_sl_unique_key (self , entity : SearchLogEntry ) -> Optional [str ]:
408- """
409- Returns a unique key for a `SearchLogEntry` by
410- combining `entity_guid` with the timestamp.
411-
412- NOTE: This is necessary because the search log API
413- does not provide a unique identifier for logs.
414-
415- :param: search log entry
416- :returns: unique key or None if no valid key is found
417- """
418- entity_guid = entity .entity_guids_all [0 ] if entity .entity_guids_all else None
419-
420- # If entity_guid is not present, try to extract it from request_dsl; otherwise, return None
421- if not entity_guid :
422- terms = deep_get (
423- entity .request_dsl , "query.function_score.query.bool.filter.bool.must"
424- )
425- if not terms :
426- return None
427-
428- if isinstance (terms , list ):
429- for term in terms :
430- if isinstance (term , dict ) and term .get ("term" , {}).get ("__guid" ):
431- entity_guid = term ["term" ]["__guid" ]
432- break
433- elif isinstance (terms , dict ):
434- entity_guid = terms .get ("term" , {}).get ("__guid" )
435-
436- return (
437- f"{ entity_guid } :{ entity .timestamp } "
438- if entity_guid and entity_guid != "undefined"
439- else None
440- )
441-
442408 def next_page (self , start = None , size = None ) -> bool :
443409 """
444410 Indicates whether there is a next page of results.
445411
446412 :returns: True if there is a next page of results, otherwise False
447413 """
448414 self ._start = start or self ._start + self ._size
449- is_bulk_search = (
450- self ._bulk or self ._approximate_count > self ._MASS_EXTRACT_THRESHOLD
451- )
452415 if size :
453416 self ._size = size
454-
455- if is_bulk_search :
456- # Used in the "timestamp-based" paging approach
457- # to check if search log with the unique key "_get_sl_unique_key()"
458- # has already been processed in a previous page of results.
459- # If it has, then exclude it from the current results;
460- # otherwise, we may encounter duplicate search log records.
461- self ._processed_log_entries .update (
462- key
463- for entity in self ._log_entries
464- if (key := self ._get_sl_unique_key (entity ))
465- )
466417 return self ._get_next_page () if self ._log_entries else False
467418
468419 def _get_next_page (self ):
@@ -501,8 +452,8 @@ def _get_next_page_json(self, is_bulk_search: bool = False):
501452 return None
502453 try :
503454 self ._log_entries = parse_obj_as (List [SearchLogEntry ], raw_json ["logs" ])
455+ self ._processed_log_entries_count += len (self ._log_entries )
504456 if is_bulk_search :
505- self ._filter_processed_entities ()
506457 self ._update_first_last_record_creation_times ()
507458 return raw_json
508459 except ValidationError as err :
@@ -514,6 +465,7 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
514465 """
515466 Adjusts the query to include timestamp filters for search log bulk extraction.
516467 """
468+ self ._criteria .dsl .from_ = 0
517469 rewritten_filters = []
518470 if isinstance (query , Bool ):
519471 for filter_ in query .filter :
@@ -522,6 +474,9 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
522474 rewritten_filters .append (filter_ )
523475
524476 if self ._first_record_creation_time != self ._last_record_creation_time :
477+ # If the first and last record creation times are different,
478+ # reset _duplicate_timestamp_page_count to its initial value
479+ self ._duplicate_timestamp_page_count = 0
525480 rewritten_filters .append (
526481 self ._get_paging_timestamp_query (self ._last_record_creation_time )
527482 )
@@ -539,47 +494,31 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
539494 # in the DSL, append it to the Bool `filter`.
540495 rewritten_filters .append (query )
541496 rewritten_query = Bool (filter = rewritten_filters )
542- self ._criteria .dsl .from_ = 0
543497 self ._criteria .dsl .query = rewritten_query
544498 else :
545- # Ensure that when switching to offset-based paging, if the first and last record timestamps are the same,
546- # we do not include a created timestamp filter (ie: Range(field='__timestamp', gte=VALUE)) in the query.
547- # Instead, ensure the search runs with only SortItem(field='__timestamp', order=<SortOrder.ASCENDING>).
548- # Failing to do so can lead to incomplete results (less than the approximate count) when running the search
549- # with a small page size.
550- if isinstance (query , Bool ):
551- for filter_ in query .filter :
552- if self ._is_paging_timestamp_query (filter_ ):
553- query .filter .remove (filter_ )
554-
555- # Always ensure that the offset is set to the length of the processed assets
556- # instead of the default (start + size), as the default may skip some assets
557- # and result in incomplete results (less than the approximate count)
558- self ._criteria .dsl .from_ = len (self ._processed_log_entries )
499+ # If the first and last record creation times are the same,
500+ # we need to switch to offset-based pagination instead of timestamp-based pagination
501+ # to ensure we get the next set of results without duplicates.
502+ # We use a page multiplier to skip already-processed records when encountering
503+ # consecutive pages with identical timestamps, preventing duplicate results.
504+ self ._criteria .dsl .from_ = self ._size * (
505+ self ._duplicate_timestamp_page_count + 1
506+ )
507+ self ._criteria .dsl .size = self ._size
508+ self ._duplicate_timestamp_page_count += 1
559509
560510 @staticmethod
561511 def _get_paging_timestamp_query (last_timestamp : int ) -> Query :
562- return Range (field = "createdAt" , gte = last_timestamp )
512+ return Range (field = "createdAt" , gt = last_timestamp )
563513
564514 @staticmethod
565515 def _is_paging_timestamp_query (filter_ : Query ) -> bool :
566516 return (
567517 isinstance (filter_ , Range )
568518 and filter_ .field == "createdAt"
569- and filter_ .gte is not None
519+ and filter_ .gt is not None
570520 )
571521
572- def _filter_processed_entities (self ):
573- """
574- Remove log entries that have already been processed to avoid duplicates.
575- """
576- self ._log_entries = [
577- entity
578- for entity in self ._log_entries
579- if entity is not None
580- and self ._get_sl_unique_key (entity ) not in self ._processed_log_entries
581- ]
582-
583522 def _update_first_last_record_creation_times (self ):
584523 self ._first_record_creation_time = self ._last_record_creation_time = - 2
585524
0 commit comments