1616// minimal_wordcount is an example that counts words in King Lear,
1717// by William Shakespeare.
1818//
19+ // minimal_wordcount demonstartes a basic Apache Beam pipeline in Go.
20+ // It reads a text file, splits it into words, counts occurences,
21+ // and writes the results to an output file.
22+ //
1923// This example is the first in a series of four successively more detailed
2024// 'word count' examples. Here, for simplicity, we don't show any
2125// error-checking or argument processing, and focus on construction of the
@@ -71,14 +75,19 @@ import (
7175 _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
7276)
7377
78+ // wordRE defines a regualr expression to extract words from a line of text.
79+ // It matches alphabetic words and handles simple apstrophes (eg. don't).
80+
7481var wordRE = regexp .MustCompile (`[a-zA-Z]+('[a-z])?` )
7582
83+ // splitWords takes a line of text and emits each word found in that line.
7684func splitWords (line string , emit func (string )) {
7785 for _ , word := range wordRE .FindAllString (line , - 1 ) {
7886 emit (word )
7987 }
8088}
8189
90+ // formatCounts formats the word and its count into readable string
8291func formatCounts (w string , c int ) string {
8392 return fmt .Sprintf ("%s: %v" , w , c )
8493}
@@ -87,16 +96,24 @@ func formatCounts(w string, c int) string {
8796// so they can be executed by portable runners. We use the register package
8897// in an init block to inform Beam of the functions we will be using, so
8998// it can access them on workers.
99+
100+ // init registers the functions so they can be executed on distrubuted workers.
101+ // Beam requires functions to be registered before execution.
90102func init () {
91103 register .Function2x0 (splitWords )
92104 register .Function2x1 (formatCounts )
93105 register .Emitter1 [string ]()
94106}
95107
108+ // To run this example:
109+ // go run minimal_wordcount.go
110+ // output will be written to "wordcounts.txt"
96111func main () {
112+ // Step 1: Initialise Beam
97113 // beam.Init() is an initialization hook that must be called on startup.
98114 beam .Init ()
99115
116+ // Step 2:
100117 // Create the Pipeline object and root scope.
101118 p := beam .NewPipeline ()
102119 s := p .Root ()
@@ -108,33 +125,39 @@ func main() {
108125 // PCollection where each element is one line from the input text
109126 // (one of Shakespeare's texts).
110127
128+ // Step 3: Read input text file
111129 // This example reads from a public dataset containing the text
112130 // of King Lear.
113131 lines := textio .Read (s , "gs://apache-beam-samples/shakespeare/kinglear.txt" )
114132
133+ // Step 4: split lens into words
115134 // Concept #2: Invoke a ParDo transform on our PCollection of text lines.
116135 // This ParDo invokes a DoFn (registered earlier) on each element that
117136 // tokenizes the text line into individual words. The ParDo returns a
118137 // PCollection of type string, where each element is an individual word in
119138 // Shakespeare's collected texts.
120139 words := beam .ParDo (s , splitWords , lines )
121140
141+ // Step 5: Count words occurences
122142 // Concept #3: Invoke the stats.Count transform on our PCollection of
123143 // individual words. The Count transform returns a new PCollection of
124144 // key/value pairs, where each key represents a unique word in the text.
125145 // The associated value is the occurrence count for that word.
126146 counted := stats .Count (s , words )
127147
148+ // Step 6: Format results
128149 // Use a ParDo to format our PCollection of word counts into a printable
129150 // string, suitable for writing to an output file. When each element
130151 // produces exactly one element, the DoFn can simply return it.
131152 formatted := beam .ParDo (s , formatCounts , counted )
132153
154+ // Step 7: Write output to file
133155 // Concept #4: Invoke textio.Write at the end of the pipeline to write
134156 // the contents of a PCollection (in this case, our PCollection of
135157 // formatted strings) to a text file.
136158 textio .Write (s , "wordcounts.txt" , formatted )
137159
160+ // Step 8: Execute pipeline
138161 // Run the pipeline on the prism runner.
139162 prism .Execute (context .Background (), p )
140163}
0 commit comments