3636import java .util .stream .Collectors ;
3737import javax .annotation .Nullable ;
3838import javax .sql .DataSource ;
39- import net .snowflake .client .jdbc .SnowflakeBasicDataSource ;
39+ import net .snowflake .client .api .datasource .SnowflakeDataSource ;
40+ import net .snowflake .client .api .datasource .SnowflakeDataSourceFactory ;
4041import net .snowflake .ingest .SimpleIngestManager ;
4142import net .snowflake .ingest .connection .HistoryResponse ;
4243import org .apache .beam .sdk .coders .Coder ;
@@ -1405,6 +1406,9 @@ public abstract static class DataSourceConfiguration implements Serializable {
14051406 @ Nullable
14061407 public abstract ValueProvider <String > getPassword ();
14071408
1409+ @ Nullable
1410+ public abstract ValueProvider <String > getAccount ();
1411+
14081412 @ Nullable
14091413 public abstract PrivateKey getPrivateKey ();
14101414
@@ -1457,6 +1461,8 @@ abstract static class Builder {
14571461
14581462 abstract Builder setPassword (ValueProvider <String > password );
14591463
1464+ abstract Builder setAccount (ValueProvider <String > account );
1465+
14601466 abstract Builder setPrivateKey (PrivateKey privateKey );
14611467
14621468 abstract Builder setRawPrivateKey (ValueProvider <String > rawPrivateKey );
@@ -1802,7 +1808,7 @@ public DataSourceConfiguration withAuthenticator(String authenticator) {
18021808 }
18031809
18041810 /**
1805- * Sets loginTimeout that will be used in {@link SnowflakeBasicDataSource #setLoginTimeout}.
1811+ * Sets loginTimeout that will be used in {@link SnowflakeDataSource #setLoginTimeout}.
18061812 *
18071813 * @param loginTimeout Integer with timeout value.
18081814 */
@@ -1819,59 +1825,62 @@ void populateDisplayData(DisplayData.Builder builder) {
18191825 }
18201826 }
18211827
1822- /** Builds {@link SnowflakeBasicDataSource } based on the current configuration. */
1828+ /** Builds {@link SnowflakeDataSource } based on the current configuration. */
18231829 public DataSource buildDatasource () {
18241830 if (getDataSource () == null ) {
1825- SnowflakeBasicDataSource basicDataSource = new SnowflakeBasicDataSource ();
1826- basicDataSource .setUrl (buildUrl ());
1831+ SnowflakeDataSource dataSource = SnowflakeDataSourceFactory . createDataSource ();
1832+ dataSource .setUrl (buildUrl ());
18271833
18281834 if (isNotEmpty (getOauthToken ())) {
1829- basicDataSource . setOauthToken (getOauthToken ().get ());
1835+ dataSource . setToken (getOauthToken ().get ());
18301836 } else if (isNotEmpty (getUsername ()) && getPrivateKey () != null ) {
1831- basicDataSource .setUser (getUsername ().get ());
1832- basicDataSource .setPrivateKey (getPrivateKey ());
1837+ dataSource .setUser (getUsername ().get ());
1838+ dataSource .setPrivateKey (getPrivateKey ());
18331839 } else if (isNotEmpty (getUsername ()) && isNotEmpty (getRawPrivateKey ())) {
18341840 PrivateKey privateKey =
18351841 KeyPairUtils .preparePrivateKey (
18361842 getRawPrivateKey ().get (), getValueOrNull (getPrivateKeyPassphrase ()));
1837- basicDataSource .setPrivateKey (privateKey );
1838- basicDataSource .setUser (getUsername ().get ());
1843+ dataSource .setPrivateKey (privateKey );
1844+ dataSource .setUser (getUsername ().get ());
18391845 } else if (isNotEmpty (getUsername ()) && isNotEmpty (getPassword ())) {
1840- basicDataSource .setUser (getUsername ().get ());
1841- basicDataSource .setPassword (getPassword ().get ());
1846+ dataSource .setUser (getUsername ().get ());
1847+ dataSource .setPassword (getPassword ().get ());
18421848 } else {
18431849 throw new RuntimeException ("Missing credentials values. Please check your credentials" );
18441850 }
18451851
1852+ if (isNotEmpty (getAccount ())) {
1853+ dataSource .setAccount (getAccount ().get ());
1854+ }
18461855 if (isNotEmpty (getDatabase ())) {
1847- basicDataSource .setDatabaseName (getDatabase ().get ());
1856+ dataSource .setDatabaseName (getDatabase ().get ());
18481857 }
18491858 if (isNotEmpty (getWarehouse ())) {
1850- basicDataSource .setWarehouse (getWarehouse ().get ());
1859+ dataSource .setWarehouse (getWarehouse ().get ());
18511860 }
18521861 if (isNotEmpty (getSchema ())) {
1853- basicDataSource .setSchema (getSchema ().get ());
1862+ dataSource .setSchema (getSchema ().get ());
18541863 }
18551864 if (isNotEmpty (getServerName ())) {
1856- basicDataSource .setServerName (getServerName ().get ());
1865+ dataSource .setServerName (getServerName ().get ());
18571866 }
18581867 if (getPortNumber () != null ) {
1859- basicDataSource .setPortNumber (getPortNumber ());
1868+ dataSource .setPortNumber (getPortNumber ());
18601869 }
18611870 if (isNotEmpty (getRole ())) {
1862- basicDataSource .setRole (getRole ().get ());
1871+ dataSource .setRole (getRole ().get ());
18631872 }
18641873 if (getAuthenticator () != null ) {
1865- basicDataSource .setAuthenticator (getAuthenticator ());
1874+ dataSource .setAuthenticator (getAuthenticator ());
18661875 }
18671876 if (getLoginTimeout () != null ) {
18681877 try {
1869- basicDataSource .setLoginTimeout (getLoginTimeout ());
1878+ dataSource .setLoginTimeout (getLoginTimeout ());
18701879 } catch (SQLException e ) {
18711880 throw new RuntimeException ("Failed to setLoginTimeout" );
18721881 }
18731882 }
1874- return basicDataSource ;
1883+ return dataSource ;
18751884 }
18761885 return getDataSource ();
18771886 }
0 commit comments