|
| 1 | +from jsoniq import RumbleSession |
| 2 | +from unittest import TestCase |
| 3 | +import json |
| 4 | +import pandas as pd |
| 5 | + |
| 6 | +class TryTesting(TestCase): |
| 7 | + def test1(self): |
| 8 | + |
| 9 | + # The syntax to start a session is similar to that of Spark. |
| 10 | + # A RumbleSession is a SparkSession that additionally knows about RumbleDB. |
| 11 | + # All attributes and methods of SparkSession are also available on RumbleSession. |
| 12 | + |
| 13 | + rumble = RumbleSession.builder.getOrCreate(); |
| 14 | + |
| 15 | + # Just to improve readability when invoking Spark methods |
| 16 | + # (such as spark.sql() or spark.createDataFrame()). |
| 17 | + spark = rumble |
| 18 | + |
| 19 | + ############################## |
| 20 | + ###### Your first query ###### |
| 21 | + ############################## |
| 22 | + |
| 23 | + # Even though RumbleDB uses Spark internally, it can be used without any knowledge of Spark. |
| 24 | + |
| 25 | + # Executing a query is done with rumble.jsoniq() like so. A query returns a sequence |
| 26 | + # of items, here the sequence with just the integer item 2. |
| 27 | + items = rumble.jsoniq('1+1') |
| 28 | + |
| 29 | + # A sequence of items can simply be converted to a list of Python/JSON values with json(). |
| 30 | + # Since there is only one value in the sequence output by this query, |
| 31 | + # we get a singleton list with the integer 2. |
| 32 | + # Generally though, the results may contain zero, one, two, or more items. |
| 33 | + python_list = items.json() |
| 34 | + print(python_list) |
| 35 | + |
| 36 | + ############################################ |
| 37 | + ##### More complex, standalone queries ##### |
| 38 | + ############################################ |
| 39 | + |
| 40 | + # JSONiq is very powerful and expressive. You will find tutorials as well as a reference on JSONiq.org. |
| 41 | + |
| 42 | + seq = rumble.jsoniq(""" |
| 43 | +
|
| 44 | + let $stores := |
| 45 | + [ |
| 46 | + { "store number" : 1, "state" : "MA" }, |
| 47 | + { "store number" : 2, "state" : "MA" }, |
| 48 | + { "store number" : 3, "state" : "CA" }, |
| 49 | + { "store number" : 4, "state" : "CA" } |
| 50 | + ] |
| 51 | + let $sales := [ |
| 52 | + { "product" : "broiler", "store number" : 1, "quantity" : 20 }, |
| 53 | + { "product" : "toaster", "store number" : 2, "quantity" : 100 }, |
| 54 | + { "product" : "toaster", "store number" : 2, "quantity" : 50 }, |
| 55 | + { "product" : "toaster", "store number" : 3, "quantity" : 50 }, |
| 56 | + { "product" : "blender", "store number" : 3, "quantity" : 100 }, |
| 57 | + { "product" : "blender", "store number" : 3, "quantity" : 150 }, |
| 58 | + { "product" : "socks", "store number" : 1, "quantity" : 500 }, |
| 59 | + { "product" : "socks", "store number" : 2, "quantity" : 10 }, |
| 60 | + { "product" : "shirt", "store number" : 3, "quantity" : 10 } |
| 61 | + ] |
| 62 | + let $join := |
| 63 | + for $store in $stores[], $sale in $sales[] |
| 64 | + where $store."store number" = $sale."store number" |
| 65 | + return { |
| 66 | + "nb" : $store."store number", |
| 67 | + "state" : $store.state, |
| 68 | + "sold" : $sale.product |
| 69 | + } |
| 70 | + return [$join] |
| 71 | + """); |
| 72 | + |
| 73 | + print(seq.json()); |
| 74 | + |
| 75 | + seq = rumble.jsoniq(""" |
| 76 | + for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10) |
| 77 | + group by $store-number := $product.store-number |
| 78 | + order by $store-number ascending |
| 79 | + return { |
| 80 | + "store" : $store-number, |
| 81 | + "products" : [ distinct-values($product.product) ] |
| 82 | + } |
| 83 | + """); |
| 84 | + print(seq.json()); |
| 85 | + |
| 86 | + ############################################################ |
| 87 | + ###### Binding JSONiq variables to Python values ########### |
| 88 | + ############################################################ |
| 89 | + |
| 90 | + # It is possible to bind a JSONiq variable to a tuple of native Python values |
| 91 | + # and then use it in a query. |
| 92 | + # JSONiq, variables are bound to sequences of items, just like the results of JSONiq |
| 93 | + # queries are sequence of items. |
| 94 | + # A Python tuple will be seamlessly converted to a sequence of items by the library. |
| 95 | + # Currently we only support strs, ints, floats, booleans, None, lists, and dicts. |
| 96 | + # But if you need more (like date, bytes, etc) we will add them without any problem. |
| 97 | + # JSONiq has a rich type system. |
| 98 | + |
| 99 | + rumble.bind('$c', (1,2,3,4, 5, 6)) |
| 100 | + print(rumble.jsoniq(""" |
| 101 | + for $v in $c |
| 102 | + let $parity := $v mod 2 |
| 103 | + group by $parity |
| 104 | + return { switch($parity) |
| 105 | + case 0 return "even" |
| 106 | + case 1 return "odd" |
| 107 | + default return "?" : $v |
| 108 | + } |
| 109 | + """).json()) |
| 110 | + |
| 111 | + rumble.bind('$c', ([1,2,3],[4,5,6])) |
| 112 | + print(rumble.jsoniq(""" |
| 113 | + for $i in $c |
| 114 | + return [ |
| 115 | + for $j in $i |
| 116 | + return { "foo" : $j } |
| 117 | + ] |
| 118 | + """).json()) |
| 119 | + |
| 120 | + rumble.bind('$c', ({"foo":[1,2,3]},{"foo":[4,{"bar":[1,False, None]},6]})) |
| 121 | + print(rumble.jsoniq('{ "results" : $c.foo[[2]] }').json()) |
| 122 | + |
| 123 | + # It is possible to bind only one value. The it must be provided as a singleton tuple. |
| 124 | + # This is because in JSONiq, an item is the same a sequence of one item. |
| 125 | + rumble.bind('$c', (42,)) |
| 126 | + print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json()) |
| 127 | + |
| 128 | + # For convenience and code readability, you can also use bindOne(). |
| 129 | + rumble.bindOne('$c', 42) |
| 130 | + print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json()) |
| 131 | + |
| 132 | + ########################################################## |
| 133 | + ##### Binding JSONiq variables to pandas DataFrames ###### |
| 134 | + ##### Getting the output as a Pandas DataFrame ###### |
| 135 | + ########################################################## |
| 136 | + |
| 137 | + # Creating a dummy pandas dataframe |
| 138 | + data = {'Name': ['Alice', 'Bob', 'Charlie'], |
| 139 | + 'Age': [30,25,35]}; |
| 140 | + pdf = pd.DataFrame(data); |
| 141 | + |
| 142 | + # Binding a pandas dataframe |
| 143 | + rumble.bind('$a',pdf); |
| 144 | + seq = rumble.jsoniq('$a.Name') |
| 145 | + # Getting the output as a pandas dataframe |
| 146 | + print(seq.pdf()) |
| 147 | + |
| 148 | + |
| 149 | + ################################################ |
| 150 | + ##### Using Pyspark DataFrames with JSONiq ##### |
| 151 | + ################################################ |
| 152 | + |
| 153 | + # The power users can also interface our library with pyspark DataFrames. |
| 154 | + # JSONiq sequences of items can have billions of items, and our library supports this |
| 155 | + # out of the box: it can also run on clusters on AWS Elastic MapReduce for example. |
| 156 | + # But your laptop is just fine, too: it will spread the computations on your cores. |
| 157 | + # You can bind a DataFrame to a JSONiq variable. JSONiq will recognize this |
| 158 | + # DataFrame as a sequence of object items. |
| 159 | + |
| 160 | + # Create a data frame also similar to Spark (but using the rumble object). |
| 161 | + data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]; |
| 162 | + columns = ["Name", "Age"]; |
| 163 | + df = spark.createDataFrame(data, columns); |
| 164 | + |
| 165 | + # This is how to bind a JSONiq variable to a dataframe. You can bind as many variables as you want. |
| 166 | + rumble.bind('$a', df); |
| 167 | + |
| 168 | + # This is how to run a query. This is similar to spark.sql(). |
| 169 | + # Since variable $a was bound to a DataFrame, it is automatically declared as an external variable |
| 170 | + # and can be used in the query. In JSONiq, it is logically a sequence of objects. |
| 171 | + res = rumble.jsoniq('$a.Name'); |
| 172 | + |
| 173 | + # There are several ways to collect the outputs, depending on the user needs but also |
| 174 | + # on the query supplied. |
| 175 | + # This returns a list containing one or several of "DataFrame", "RDD", "PUL", "Local" |
| 176 | + # If DataFrame is in the list, df() can be invoked. |
| 177 | + # If RDD is in the list, rdd() can be invoked. |
| 178 | + # If Local is the list, items() or json() can be invokved, as well as the local iterator API. |
| 179 | + modes = res.availableOutputs(); |
| 180 | + for mode in modes: |
| 181 | + print(mode) |
| 182 | + |
| 183 | + ######################################################### |
| 184 | + ###### Manipulating DataFrames with SQL and JSONiq ###### |
| 185 | + ######################################################### |
| 186 | + |
| 187 | + # If the output of the JSONiq query is structured (i.e., RumbleDB was able to detect a schema), |
| 188 | + # then we can extract a regular data frame that can be further processed with spark.sql() or rumble.jsoniq(). |
| 189 | + df = res.df(); |
| 190 | + df.show(); |
| 191 | + |
| 192 | + # We are continuously working on the detection of schemas and RumbleDB will get better at it with them. |
| 193 | + # JSONiq is a very powerful language and can also produce heterogeneous output "by design". Then you need |
| 194 | + # to use rdd() instead of df(), or to collect the list of JSON values (see further down). Remember |
| 195 | + # that availableOutputs() tells you what is at your disposal. |
| 196 | + |
| 197 | + # A DataFrame output by JSONiq can be reused as input to a Spark SQL query. |
| 198 | + # (Remember that rumble is a wrapper around a SparkSession object, so you can use rumble.sql() just like spark.sql()) |
| 199 | + df.createTempView("myview") |
| 200 | + df2 = spark.sql("SELECT * FROM myview").toDF("name"); |
| 201 | + df2.show(); |
| 202 | + |
| 203 | + # A DataFrame output by Spark SQL can be reused as input to a JSONiq query. |
| 204 | + rumble.bind('$b', df2); |
| 205 | + seq2 = rumble.jsoniq("for $i in 1 to 5 return $b"); |
| 206 | + df3 = seq2.df(); |
| 207 | + df3.show(); |
| 208 | + |
| 209 | + # And a DataFrame output by JSONiq can be reused as input to another JSONiq query. |
| 210 | + rumble.bind('$b', df3); |
| 211 | + seq3 = rumble.jsoniq("$b[position() lt 3]"); |
| 212 | + df4 = seq3.df(); |
| 213 | + df4.show(); |
| 214 | + |
| 215 | + ######################### |
| 216 | + ##### Local access ###### |
| 217 | + ######################### |
| 218 | + |
| 219 | + # This materializes the rows as items. |
| 220 | + # The items are accessed with the RumbleDB Item API. |
| 221 | + list = res.items(); |
| 222 | + for result in list: |
| 223 | + print(result.getStringValue()) |
| 224 | + |
| 225 | + # This streams through the items one by one |
| 226 | + res.open(); |
| 227 | + while (res.hasNext()): |
| 228 | + print(res.next().getStringValue()); |
| 229 | + res.close(); |
| 230 | + |
| 231 | + ################################################################################################################ |
| 232 | + ###### Native Python/JSON Access for bypassing the Item API (but losing on the richer JSONiq type system) ###### |
| 233 | + ################################################################################################################ |
| 234 | + |
| 235 | + # This method directly gets the result as JSON (dict, list, strings, ints, etc). |
| 236 | + jlist = res.json(); |
| 237 | + for str in jlist: |
| 238 | + print(str); |
| 239 | + |
| 240 | + # This streams through the JSON values one by one. |
| 241 | + res.open(); |
| 242 | + while(res.hasNext()): |
| 243 | + print(res.nextJSON()); |
| 244 | + res.close(); |
| 245 | + |
| 246 | + # This gets an RDD of JSON values that can be processed by Python |
| 247 | + rdd = res.rdd(); |
| 248 | + print(rdd.count()); |
| 249 | + for str in rdd.take(10): |
| 250 | + print(str); |
| 251 | + |
| 252 | + ################################################### |
| 253 | + ###### Write back to the disk (or data lake) ###### |
| 254 | + ################################################### |
| 255 | + |
| 256 | + # It is also possible to write the output to a file locally or on a cluster. The API is similar to that of Spark dataframes. |
| 257 | + # Note that it creates a directory and stores the (potentially very large) output in a sharded directory. |
| 258 | + # RumbleDB was already tested with up to 64 AWS machines and 100s of TBs of data. |
| 259 | + # Of course the examples below are so small that it makes more sense to process the results locally with Python, |
| 260 | + # but this shows how GBs or TBs of data obtained from JSONiq can be written back to disk. |
| 261 | + seq = rumble.jsoniq("$a.Name"); |
| 262 | + seq.write().mode("overwrite").json("outputjson"); |
| 263 | + seq.write().mode("overwrite").parquet("outputparquet"); |
| 264 | + |
| 265 | + seq = rumble.jsoniq("1+1"); |
| 266 | + seq.write().mode("overwrite").text("outputtext"); |
| 267 | + |
| 268 | + self.assertTrue(True) |
0 commit comments