DataSource Client SDK 的使用
Linkis DataSource 提供了方便的JAVA和SCALA调用的接口,只需要引入linkis-datasource-client的模块就可以进行使用,
1. 引入依赖模块#
<dependency> <groupId>org.apache.linkis</groupId> <artifactId>linkis-datasource-client</artifactId> <version>${linkis.version}</version> </dependency>如: <dependency> <groupId>org.apache.linkis</groupId> <artifactId>linkis-datasource-client</artifactId> <version>1.1.0</version> </dependency>2. Scala测试代码#
建立Scala的测试类LinkisDataSourceClientTest,具体接口含义可以见注释:
import com.fasterxml.jackson.databind.ObjectMapperimport org.apache.linkis.common.utils.JsonUtilsimport org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}import org.apache.linkis.datasource.client.request.{CreateDataSourceAction, GetAllDataSourceTypesAction, MetadataGetDatabasesAction, UpdateDataSourceParameterAction}import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType}import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategyimport org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
import java.io.StringWriterimport java.utilimport java.util.concurrent.TimeUnit
object LinkisDataSourceClientTest {
def main(args: Array[String]): Unit = { val clientConfig =DWSClientConfigBuilder.newBuilder .addServerUrl("http://127.0.0.1:9001") //set linkis-mg-gateway url: http://{ip}:{port} .connectionTimeout(30000) //connection timtout .discoveryEnabled(false) //disable discovery .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency .loadbalancerEnabled(true) // enable loadbalance .maxConnectionSize(5) // set max Connection .retryEnabled(false) // set retry .readTimeout(30000) //set read timeout .setAuthenticationStrategy(new StaticAuthenticationStrategy) //AuthenticationStrategy Linkis authen suppory static and Token .setAuthTokenKey("hadoop") // set submit user .setAuthTokenValue("xxx") // set passwd or token .setDWSVersion("v1") //linkis rest version v1 .build //init datasource remote client val dataSourceClient = new LinkisDataSourceRemoteClient(clientConfig) //init metadata remote client val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig) //get all datasource type testGetAllDataSourceTypes(dataSourceClient) //create kafka datasource testCreateDataSourceForKafka(dataSourceClient) //create es datasource testCreateDataSourceForEs(dataSourceClient) //update datasource parameter for kafka testUpdateDataSourceParameterForKafka(dataSourceClient) //update datasource parameter for es testUpdateDataSourceParameterForEs(dataSourceClient) //get hive metadata database list testMetadataGetDatabases(metaDataClient) }
def testGetAllDataSourceTypes(client:LinkisDataSourceRemoteClient): Unit ={ val getAllDataSourceTypesResult = client.getAllDataSourceTypes(GetAllDataSourceTypesAction.builder().setUser("hadoop").build()).getAllDataSourceType System.out.println(getAllDataSourceTypesResult) }
def testCreateDataSourceForKafka(client:LinkisDataSourceRemoteClient): Unit ={ val dataSource = new DataSource(); val dataSourceType = new DataSourceType dataSourceType.setName("kafka") dataSourceType.setId("2") dataSourceType.setLayers(2) dataSourceType.setClassifier("消息队列") dataSourceType.setDescription("kafka") dataSource.setDataSourceType(dataSourceType) dataSource.setDataSourceName("kafka-test") dataSource.setCreateSystem("client") dataSource.setDataSourceTypeId(2l);
val dsJsonWriter = new StringWriter
val mapper = new ObjectMapper JsonUtils.jackson.writeValue(dsJsonWriter, dataSource) val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass) val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id System.out.println(id)
}
def testCreateDataSourceForEs(client:LinkisDataSourceRemoteClient): Unit ={ val dataSource = new DataSource(); dataSource.setDataSourceName("es-test") dataSource.setCreateSystem("client") dataSource.setDataSourceTypeId(7l); val dsJsonWriter = new StringWriter val mapper = new ObjectMapper JsonUtils.jackson.writeValue(dsJsonWriter, dataSource) val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass) val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id System.out.println(id)
}
def testUpdateDataSourceParameterForKafka(client:LinkisDataSourceRemoteClient): Unit ={ val params = new util.HashMap[String,Any]() val connParams = new util.HashMap[String,Any]() connParams.put("brokers","172.24.2.232:9092") params.put("connectParams",connParams) params.put("comment","kafka data source") client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("7").addRequestPayloads(params).build()) }
def testUpdateDataSourceParameterForEs(client:LinkisDataSourceRemoteClient): Unit ={ val params = new util.HashMap[String,Any]() val connParams = new util.HashMap[String,Any]() val elasticUrls = new util.ArrayList[String]() elasticUrls.add("http://172.24.2.231:9200") connParams.put("elasticUrls",elasticUrls) params.put("connectParams",connParams) params.put("comment","es data source") client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("8").addRequestPayloads(params).build()) }
def testMetadataGetDatabases(client:LinkisMetaDataRemoteClient): Unit ={ client.getDatabases(MetadataGetDatabasesAction.builder().setUser("hadoop").setDataSourceId(9l).setUser("hadoop").setSystem("client").build()).getDbs }}