|
25 | 25 | import io.cdap.cdap.etl.api.FailureCollector; |
26 | 26 | import io.cdap.cdap.etl.api.PipelineConfigurer; |
27 | 27 | import io.cdap.cdap.etl.api.batch.BatchSource; |
| 28 | +import io.cdap.cdap.etl.api.batch.BatchSourceContext; |
28 | 29 | import io.cdap.cdap.etl.api.connector.Connector; |
| 30 | +import io.cdap.plugin.common.Asset; |
29 | 31 | import io.cdap.plugin.common.ConfigUtil; |
| 32 | +import io.cdap.plugin.common.LineageRecorder; |
30 | 33 | import io.cdap.plugin.db.SchemaReader; |
31 | 34 | import io.cdap.plugin.db.batch.config.AbstractDBSpecificSourceConfig; |
32 | 35 | import io.cdap.plugin.db.batch.source.AbstractDBSource; |
33 | 36 | import io.cdap.plugin.postgres.PostgresDBRecord; |
34 | 37 | import io.cdap.plugin.postgres.PostgresSchemaReader; |
35 | 38 | import io.cdap.plugin.util.CloudSQLUtil; |
| 39 | +import io.cdap.plugin.util.DBUtils; |
| 40 | +import org.apache.commons.lang.StringUtils; |
36 | 41 | import org.apache.hadoop.mapreduce.lib.db.DBWritable; |
37 | 42 |
|
38 | 43 | import java.util.Collections; |
@@ -97,6 +102,30 @@ protected String createConnectionString() { |
97 | 102 | cloudsqlPostgresqlSourceConfig.connection.getConnectionName()); |
98 | 103 | } |
99 | 104 |
|
| 105 | + @Override |
| 106 | + protected LineageRecorder getLineageRecorder(BatchSourceContext context) { |
| 107 | + String host; |
| 108 | + String location = ""; |
| 109 | + if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase( |
| 110 | + cloudsqlPostgresqlSourceConfig.getConnection().getInstanceType())) { |
| 111 | + // connection is the private IP address |
| 112 | + host = cloudsqlPostgresqlSourceConfig.getConnection().getConnectionName(); |
| 113 | + } else { |
| 114 | + // connection is of the form <projectId>:<region>:<instanceName> |
| 115 | + String[] connectionParams = cloudsqlPostgresqlSourceConfig.getConnection().getConnectionName().split(":"); |
| 116 | + host = connectionParams[2]; |
| 117 | + location = connectionParams[1]; |
| 118 | + } |
| 119 | + String fqn = DBUtils.constructFQN("postgres", host, 5432, |
| 120 | + cloudsqlPostgresqlSourceConfig.getConnection().getDatabase(), |
| 121 | + cloudsqlPostgresqlSourceConfig.getReferenceName()); |
| 122 | + Asset.Builder assetBuilder = Asset.builder(cloudsqlPostgresqlSourceConfig.getReferenceName()).setFqn(fqn); |
| 123 | + if (!StringUtils.isEmpty(location)) { |
| 124 | + assetBuilder.setLocation(location); |
| 125 | + } |
| 126 | + return new LineageRecorder(context, assetBuilder.build()); |
| 127 | + } |
| 128 | + |
100 | 129 | /** CloudSQL PostgreSQL source config. */ |
101 | 130 | public static class CloudSQLPostgreSQLSourceConfig extends AbstractDBSpecificSourceConfig { |
102 | 131 |
|
|
0 commit comments