1717
1818import Energistics .Etp .v12 .Datatypes .AnyArray ;
1919import Energistics .Etp .v12 .Datatypes .AnyLogicalArrayType ;
20- import Energistics .Etp .v12 .Datatypes .ArrayOfFloat ;
2120import Energistics .Etp .v12 .Datatypes .DataArrayTypes .*;
2221import Energistics .Etp .v12 .Datatypes .Object .*;
2322import Energistics .Etp .v12 .Protocol .DataArray .*;
2827import Energistics .Etp .v12 .Protocol .Store .PutDataObjects ;
2928import com .geosiris .etp .communication .Message ;
3029import com .geosiris .etp .websocket .ETPClient ;
31- import org .apache .avro .reflect .MapEntry ;
32- import org .apache .avro .specific .SpecificRecordBase ;
3330import org .apache .logging .log4j .LogManager ;
3431import org .apache .logging .log4j .Logger ;
3532import org .w3c .dom .Document ;
3633
3734import java .io .File ;
3835import java .io .FileNotFoundException ;
39- import java .lang .reflect .InvocationTargetException ;
4036import java .nio .charset .StandardCharsets ;
4137import java .util .*;
4238import java .util .stream .Collectors ;
43- import java .util .stream .IntStream ;
4439
4540public class ETPHelper {
4641 public static final Logger logger = LogManager .getLogger (ETPHelper .class );
@@ -338,7 +333,6 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
338333 logger .debug ("\t [{}]" , String .join (",\n " , subArrays .keySet ()));
339334
340335 assert subArrays .size () + fullArrays .stream ().map (x -> x .r ().size ()).reduce (0 , (a ,b ) -> a + b ) == datasets_paths .size ();
341- // if(true) return result;
342336
343337 List <Long > msgIdsToWait = new ArrayList <>();
344338 for (Pair <Long , List <String >> fk : fullArrays ){
@@ -360,30 +354,25 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
360354 Map <Long , String > msgIdsToWaitSubArray = new HashMap <>();
361355 for (Map .Entry <String , DataArrayMetadata > e_subArray : subArrays .entrySet ()){
362356 List <Long > dimensions = e_subArray .getValue ().getDimensions ();
363- long thisSize = e_subArray .getValue ().getDimensions ().stream ().reduce (1L , (a , b ) -> a * b ) * eltSize ;
364357 Map <CharSequence , GetDataSubarraysType > mapSubArr = new HashMap <>();
365358
366- long nbSplit = thisSize * eltSize / maxMsgSize ;
367- for (int i =0 ; i <=nbSplit ; i ++) {
368- long sub_start = ((i ) * maxMsgSize / eltSize );
369- long sub_stop = Math .min ((i + 1 ) * maxMsgSize / eltSize , thisSize / eltSize );
370- long sub_size = sub_stop - sub_start ;
359+ long startLineIdx = 0 ;
360+ long endLineIdx = 0 ;
361+ long sizeOfLine = dimensions .size () > 1 ? dimensions .subList (1 , dimensions .size ()).stream ().reduce (1L , (a , b ) -> a * b ) : 1 ;
362+ long nbLinePerMsg = (maxMsgSize - securitySize ) / (sizeOfLine * eltSize );
363+ while (endLineIdx < dimensions .get (0 )){
364+ endLineIdx = Math .min (startLineIdx + nbLinePerMsg , dimensions .get (0 ));
371365 List <Long > counts = new ArrayList <>();
372366 List <Long > starts = new ArrayList <>();
373- List <Long > dim_reversed = new ArrayList <>(dimensions );
374- Collections .reverse (dim_reversed );
375- long tmp_sub_start = sub_start ;
376- for (int di =0 ; di < dimensions .size (); di ++){
377- if (di < dimensions .size () - 1 ) {
378- Long lineSize = dimensions .subList (di + 1 , dimensions .size ()).stream ().reduce (1L , (subtotal , element ) -> subtotal * element );
379- starts .add (tmp_sub_start / lineSize );
380- tmp_sub_start -= starts .get (di ) * lineSize ;
381- counts .add (sub_size /lineSize );
382- }else {
383- starts .add (tmp_sub_start );
384- counts .add (Math .min (dimensions .get (di ), sub_size ));
385- }
367+
368+ counts .add (endLineIdx - startLineIdx );
369+ starts .add (startLineIdx );
370+
371+ for (int i =1 ; i <dimensions .size (); i ++){
372+ counts .add (dimensions .get (i ));
373+ starts .add (0L );
386374 }
375+ logger .debug ("SubArr : Counts {} : Starts {}" , counts , starts );
387376 mapSubArr .put (String .valueOf (mapSubArr .size ()),
388377 GetDataSubarraysType .newBuilder ()
389378 .setCounts (counts )
@@ -394,18 +383,20 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
394383 .build ()
395384 ).build ()
396385 );
397- }
398- GetDataSubarrays gdsa = GetDataSubarrays .newBuilder ()
386+ GetDataSubarrays gdsa = GetDataSubarrays .newBuilder ()
399387 .setDataSubarrays (mapSubArr ).build ();
400- long msgId = client .send (gdsa );
401- msgIdsToWaitSubArray .put (msgId , e_subArray .getKey ());
388+ long msgId = client .send (gdsa );
389+ msgIdsToWaitSubArray .put (msgId , e_subArray .getKey ());
390+
391+ startLineIdx = startLineIdx + nbLinePerMsg ;
392+ }
402393 }
403394
404- Map <String , List <GetDataSubarraysResponse >> subArrayResps = new HashMap <>();
395+ Map <String , List <Message >> subArrayResps = new HashMap <>();
405396
406397 for (Map .Entry <Long , List <Message >> e_answers : client .getEtpClientSession ().waitForResponse (msgIdsToWait , 500000 ).entrySet ()){
407398 List <Message > answers = e_answers .getValue ();
408- for (Message answer : answers .stream ().sorted ((m0 , m1 ) -> Integer . compare (( int ) m0 . getHeader (). getMessageId (), ( int ) m1 .getHeader ().getMessageId ())).collect (Collectors .toList ())) {
399+ for (Message answer : answers .stream ().sorted (Comparator . comparingInt (m0 -> ( int ) m0 .getHeader ().getMessageId ())).collect (Collectors .toList ())) {
409400 if (answer .getBody () instanceof GetDataArraysResponse ) {
410401 GetDataArraysResponse gdar = ((GetDataArraysResponse ) answer .getBody ());
411402 for (Map .Entry <CharSequence , DataArray > e_gdar : gdar .getDataArrays ().entrySet ()){
@@ -417,15 +408,6 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
417408 }
418409
419410 }
420- // } else if (answer.getBody() instanceof GetDataSubarraysResponse) {
421- // String identifier = msgIdsToWaitSubArray.get(e_answers.getKey());
422- // logger.debug("identifier : {}", identifier);
423- // logger.debug(String.join(", \n", ((GetDataSubarraysResponse) answer.getBody()).getDataSubarrays().keySet()));
424- // if(!subArrayResps.containsKey(identifier)){
425- // subArrayResps.put(identifier, new ArrayList<>());
426- // }
427- // subArrayResps.get(identifier).add(((GetDataSubarraysResponse) answer.getBody()));
428-
429411 } else {
430412 logger .debug ("Unexpected answer for msg : {}" , answer .getHeader ().getCorrelationId ());
431413 }
@@ -434,15 +416,16 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
434416
435417 for (Map .Entry <Long , List <Message >> e_answers : client .getEtpClientSession ().waitForResponse (new ArrayList <>(msgIdsToWaitSubArray .keySet ()), 500000 ).entrySet ()){
436418 List <Message > answers = e_answers .getValue ();
437- for (Message answer : answers .stream ().sorted ((m0 , m1 ) -> Integer . compare (( int ) m0 . getHeader (). getMessageId (), ( int ) m1 .getHeader ().getMessageId ())).collect (Collectors .toList ())) {
419+ for (Message answer : answers .stream ().sorted (Comparator . comparingInt (m0 -> ( int ) m0 .getHeader ().getMessageId ())).collect (Collectors .toList ())) {
438420 if (answer .getBody () instanceof GetDataSubarraysResponse ) {
439421 String identifier = msgIdsToWaitSubArray .get (e_answers .getKey ());
440422 logger .debug ("identifier : {}" , identifier );
441423 logger .debug (String .join (", \n " , ((GetDataSubarraysResponse ) answer .getBody ()).getDataSubarrays ().keySet ()));
442424 if (!subArrayResps .containsKey (identifier )){
443425 subArrayResps .put (identifier , new ArrayList <>());
444426 }
445- subArrayResps .get (identifier ).add (((GetDataSubarraysResponse ) answer .getBody ()));
427+ // subArrayResps.get(identifier).add(((GetDataSubarraysResponse) answer.getBody()));
428+ subArrayResps .get (identifier ).add (answer );
446429
447430 } else {
448431 logger .debug ("Unexpected answer for msg : {}" , answer .getHeader ().getCorrelationId ());
@@ -451,9 +434,11 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
451434 }
452435
453436 // For subArraysResponse : sort and reduce results for each array
454- for (Map .Entry <String , List <GetDataSubarraysResponse >> sub_arr_r : subArrayResps .entrySet ()){
455- Map <CharSequence , DataArray > allSubArrayParts = sub_arr_r .getValue ().stream ().map (gdsr -> gdsr .getDataSubarrays ().entrySet ()).flatMap (Set ::stream )
456- .collect (Collectors .toMap (Map .Entry <CharSequence , DataArray >::getKey , Map .Entry <CharSequence , DataArray >::getValue ));
437+ for (Map .Entry <String , List <Message >> sub_arr_r : subArrayResps .entrySet ()){
438+ /*Map<CharSequence, DataArray> allSubArrayParts =
439+ sub_arr_r.getValue().stream()
440+ .map(m_gdsr -> ((GetDataSubarraysResponse)m_gdsr.getBody()).getDataSubarrays().entrySet()).flatMap(Set::stream)
441+ .collect(Collectors.toMap(Map.Entry<CharSequence, DataArray>::getKey, Map.Entry<CharSequence, DataArray>::getValue));
457442 List<Number> numbers = allSubArrayParts.entrySet().stream()
458443 .sorted(Comparator.comparingInt(e -> Integer.getInteger(e.getKey().toString())))
459444 .map(e -> {
@@ -464,67 +449,24 @@ public static Map<String, List<Number>> getMultipleDataArrays(ETPClient client,
464449 }
465450 })
466451 .flatMap(List::stream).collect(Collectors.toList());
467- result .put (sub_arr_r .getKey (),numbers );
452+ result.put(sub_arr_r.getKey(),numbers);*/
453+ List <Number > das = sub_arr_r .getValue ().stream ()
454+ .sorted (Comparator .comparingLong (m0 -> m0 .getHeader ().getMessageId ()))
455+ .map (m -> ((GetDataSubarraysResponse ) m .getBody ()).getDataSubarrays ().values ())
456+ .flatMap (Collection ::stream )
457+ .map (e -> {
458+ try {
459+ return (List <Number >) ETPUtils .getAttributeValue (e .getData ().getItem (), "values" );
460+ } catch (Exception ex ) {
461+ throw new RuntimeException (ex );
462+ }
463+ }).filter (Objects ::nonNull )
464+ .flatMap (List ::stream )
465+ .collect (Collectors .toList ());
466+ result .put (sub_arr_r .getKey (), das );
468467 }
469468
470469 return result ;
471-
472-
473- /*Map<CharSequence, DataArrayIdentifier> mapIdentifier = new HashMap<>();
474- for(String ds_path: datasets_paths){
475- DataArrayIdentifier identifier = DataArrayIdentifier.newBuilder()
476- .setUri(uri)
477- .setPathInResource(ds_path)
478- .build();
479- mapIdentifier.put(String.valueOf(mapIdentifier.size()), identifier);
480- }
481-
482- GetDataArrays gda = GetDataArrays.newBuilder()
483- .setDataArrays(mapIdentifier).build();
484- long msg_id = client.send(gda);
485- logger.debug(msg_id + ") GetDataSubArray sent " + gda);
486- List<Message> resp = client.getEtpClientSession().waitForResponse(msg_id, timeoutMS);
487- System.out.println("End Wait");
488- // if(true) return null;
489- Map<CharSequence, DataArray> da_entries = new HashMap<>();
490- for(Message m: resp){
491- if(m.getBody() instanceof GetDataArraysResponse){
492- GetDataArraysResponse gdar = (GetDataArraysResponse) m.getBody();
493- da_entries.putAll(gdar.getDataArrays());
494- // for(Map.Entry<CharSequence, DataArray> entry: gdar.getDataArrays().entrySet()){
495- // try {
496- // da_entries.set(Integer.getInteger(String.valueOf(entry.getKey())), entry.getValue());
497- // }catch (Exception e){
498- // System.err.println("Error for " + da_entries.size() + "idx " + entry.getKey() + " " + entry.getValue());
499- // e.printStackTrace();
500- // }
501- // }
502- }else{
503- logger.error("@getDataArray Unexpected answer : " + m.getBody());
504- }
505- }
506- for(Map.Entry<CharSequence, DataArray> c: da_entries.entrySet()){
507- System.out.println(c.getKey());
508- }
509- da_entries.entrySet().stream().filter(Objects::nonNull).forEach(a -> System.out.println("> " + a));
510- List<DataArray> da_list = da_entries.entrySet().stream().filter(Objects::nonNull).sorted(
511- Comparator.comparingInt(a -> Integer.getInteger(String.valueOf(a.getKey()))))
512- .map(Map.Entry::getValue).collect(Collectors.toList());
513-
514- List<Integer> not_found = IntStream.range(0, datasets_paths.size()).filter( i -> !da_entries.containsKey(String.valueOf(i))).boxed().collect(Collectors.toList());
515- for(Integer i: not_found){
516- System.out.println("NOT FOUND : " + i);
517- }
518-
519- for(DataArray da: da_list){
520- try {
521- result.addAll((Collection<? extends Number>) ETPUtils.getAttributeValue(da.getData().getItem(), "value"));
522- } catch (Exception e) {
523- logger.error(e);
524- }
525- }
526- return result;*/
527-
528470 }
529471
530472
0 commit comments