2121import org .apache .beam .sdk .Pipeline ;
2222import org .apache .beam .sdk .io .TextIO ;
2323import org .apache .beam .sdk .transforms .Count ;
24+ import org .apache .beam .sdk .transforms .Filter ;
2425import org .apache .beam .sdk .transforms .FlatMapElements ;
2526import org .apache .beam .sdk .transforms .MapElements ;
2627import org .apache .beam .sdk .values .KV ;
2728import org .apache .beam .sdk .values .TypeDescriptors ;
28- import org .apache .beam .sdk .transforms .Filter ;
2929
3030/**
3131 * A minimal "pico" example of WordCount.
@@ -39,17 +39,18 @@ public static void main(String[] args) {
3939 Pipeline p = Pipeline .create ();
4040
4141 p .apply ("ReadLines" , TextIO .read ().from ("input.txt" ))
42- .apply ("ExtractWords" ,
43- FlatMapElements .into (TypeDescriptors .strings ())
44- .via ((String line ) -> Arrays .asList (line .split ("\\ W+" ))))
45- .apply ("FilterEmptyWords" ,
46- Filter .by ((String word ) -> !word .isEmpty ()))
47- .apply ("CountWords" , Count .perElement ())
48- .apply ("FormatResults" ,
49- MapElements .into (TypeDescriptors .strings ())
50- .via ((KV <String , Long > kv ) -> kv .getKey () + ": " + kv .getValue ()))
51- .apply ("WriteResults" , TextIO .write ().to ("output" ));
42+ .apply (
43+ "ExtractWords" ,
44+ FlatMapElements .into (TypeDescriptors .strings ())
45+ .via ((String line ) -> Arrays .asList (line .split ("\\ W+" ))))
46+ .apply ("FilterEmptyWords" , Filter .by ((String word ) -> !word .isEmpty ()))
47+ .apply ("CountWords" , Count .perElement ())
48+ .apply (
49+ "FormatResults" ,
50+ MapElements .into (TypeDescriptors .strings ())
51+ .via ((KV <String , Long > kv ) -> kv .getKey () + ": " + kv .getValue ()))
52+ .apply ("WriteResults" , TextIO .write ().to ("output" ));
5253
5354 p .run ().waitUntilFinish ();
5455 }
55- }
56+ }
0 commit comments