|
1 | 1 | package sqlline; |
2 | 2 |
|
3 | | -import com.linkedin.hoptimator.Pipeline; |
4 | | -import com.linkedin.hoptimator.Source; |
5 | 3 | import com.linkedin.hoptimator.SqlDialect; |
6 | 4 | import com.linkedin.hoptimator.jdbc.HoptimatorConnection; |
7 | 5 | import com.linkedin.hoptimator.jdbc.HoptimatorDdlUtils; |
@@ -278,56 +276,13 @@ public void execute(String line, DispatchCallback dispatchCallback) { |
278 | 276 | } |
279 | 277 | String sql = split[1]; |
280 | 278 | HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection(); |
281 | | - Pair<SchemaPlus, Table> schemaSnapshot = null; |
282 | | - String viewName = "sink"; |
283 | 279 | try { |
284 | | - String querySql = sql; |
285 | | - SqlCreateMaterializedView create = null; |
286 | | - SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql); |
287 | | - if (sqlNode.getKind().belongsTo(SqlKind.DDL)) { |
288 | | - if (sqlNode instanceof SqlCreateMaterializedView) { |
289 | | - create = (SqlCreateMaterializedView) sqlNode; |
290 | | - final SqlNode q = HoptimatorDdlUtils.renameColumns(create.columnList, create.query); |
291 | | - querySql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); |
292 | | - viewName = HoptimatorDdlUtils.viewName(create.name); |
293 | | - } else { |
294 | | - sqlline.error("Unsupported DDL statement: " + sql); |
295 | | - dispatchCallback.setToFailure(); |
296 | | - return; |
297 | | - } |
298 | | - } |
299 | | - |
300 | | - RelRoot root = HoptimatorDriver.convert(conn, querySql).root; |
301 | | - Properties connectionProperties = conn.connectionProperties(); |
302 | | - RelOptTable table = root.rel.getTable(); |
303 | | - if (table != null) { |
304 | | - connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName())); |
305 | | - } else if (create != null) { |
306 | | - connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, create.name.toString()); |
307 | | - } |
308 | | - PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties); |
309 | | - if (create != null) { |
310 | | - schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(), |
311 | | - new HoptimatorDriver.Prepare(conn), plan, create, querySql); |
312 | | - } |
313 | | - Pipeline pipeline = plan.pipeline(viewName, conn); |
314 | | - List<String> specs = new ArrayList<>(); |
315 | | - for (Source source : pipeline.sources()) { |
316 | | - specs.addAll(DeploymentService.specify(source, conn)); |
317 | | - } |
318 | | - specs.addAll(DeploymentService.specify(pipeline.sink(), conn)); |
319 | | - specs.addAll(DeploymentService.specify(pipeline.job(), conn)); |
| 280 | + List<String> specs = HoptimatorDdlUtils.specifyFromSql(sql, conn).specs; |
320 | 281 | specs.forEach(x -> sqlline.output(x + "\n\n---\n\n")); |
321 | 282 | } catch (SQLException e) { |
322 | 283 | sqlline.error(e); |
323 | 284 | dispatchCallback.setToFailure(); |
324 | 285 | } |
325 | | - if (schemaSnapshot != null) { |
326 | | - if (schemaSnapshot.right != null) { |
327 | | - schemaSnapshot.left.add(viewName, schemaSnapshot.right); |
328 | | - } |
329 | | - schemaSnapshot.left.removeTable(viewName); |
330 | | - } |
331 | 286 | } |
332 | 287 |
|
333 | 288 | @Override |
|
0 commit comments