Use of 1.0 SDK
Linkis provides a convenient interface for calling JAVA and SCALA. You only need to introduce the linkis-computation-client module to use it. After 1.0, the method of submitting with Label is added. The following will be compatible with 0.X and 1.0. New way to introduce
1 Introduce dependent modules#
<dependency> <groupId>com.webank.wedatasphere.linkis</groupId> <artifactId>linkis-computation-client</artifactId> <version>${linkis.version}</version></dependency>like:<dependency> <groupId>com.webank.wedatasphere.linkis</groupId> <artifactId>linkis-computation-client</artifactId> <version>1.0.0-RC1</version></dependency>2 Compatible with 0.X Execute method submission#
2.1 Java test code#
Establish the Java test class UJESClientImplTestJ, the specific interface meaning can be seen in the notes:
package com.webank.wedatasphere.linkis.client.test
import com.webank.wedatasphere.linkis.common.utils.Utils;import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;import com.webank.wedatasphere.linkis.ujes.client.UJESClient;import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;import org.apache.commons.io.IOUtils;
import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;
public class LinkisClientTest {
public static void main(String[] args){
String user = "hadoop"; String executeCode = "show databases;";
// 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder() .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the linkis server-side gateway, such as http://{ip}:{port} .connectionTimeout(30000) //connectionTimeOut client connection timeout .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name .setDWSVersion("v1").build(); //Linkis background protocol version, the current version is v1
// 2. Get a UJESClient through DWSClientConfig UJESClient client = new UJESClientImpl(clientConfig);
try { // 3. Start code execution System.out.println("user: "+ user + ", code: [" + executeCode + "]"); Map<String, Object> startupMap = new HashMap<String, Object>(); startupMap.put("wds.linkis.yarnqueue", "default"); // Various startup parameters can be stored in startupMap, see linkis management console configuration JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder() .setCreator("linkisClient-Test") //creator, the system name of the client requesting linkis, used for system-level isolation .addExecuteCode(executeCode) //ExecutionCode The code to be executed .setEngineType((JobExecuteAction.EngineType) JobExecuteAction.EngineType$.MODULE$.HIVE()) // The execution engine type of the linkis that you want to request, such as Spark hive, etc. .setUser(user) //User, request user; used for user-level multi-tenant isolation .setStartupParams(startupMap) .build()); System.out.println("execId: "+ jobExecuteResult.getExecID() + ", taskId:" + jobExecuteResult.taskID());
// 4. Get the execution status of the script JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult); int sleepTimeMills = 1000; while(!jobInfoResult.isCompleted()) { // 5. Get the execution progress of the script JobProgressResult progress = client.progress(jobExecuteResult); Utils.sleepQuietly(sleepTimeMills); jobInfoResult = client.getJobInfo(jobExecuteResult); }
// 6. Get the job information of the script JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult); // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated) String resultSet = jobInfo.getResultSetList(client)[0]; // 8. Get a specific result set through a result set information Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent(); System.out.println("fileContents: "+ fileContents); } catch (Exception e) { e.printStackTrace(); IOUtils.closeQuietly(client); } IOUtils.closeQuietly(client); } }Run the above code to interact with Linkis
2.2 Scala test code#
package com.webank.wedatasphere.linkis.client.test
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.Utilsimport com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategyimport com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilderimport com.webank.wedatasphere.linkis.ujes.client.UJESClientimport com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.EngineTypeimport com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}import org.apache.commons.io.IOUtils
object LinkisClientImplTest extends App {
var executeCode = "show databases;" var user = "hadoop"
// 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder val clientConfig = DWSClientConfigBuilder.newBuilder() .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the Linkis server-side gateway, such as http://{ip}:{port} .connectionTimeout(30000) //connectionTimeOut client connection timeout .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name .setDWSVersion("v1").build() //Linkis backend protocol version, the current version is v1
// 2. Get a UJESClient through DWSClientConfig val client = UJESClient(clientConfig)
try { // 3. Start code execution println("user: "+ user + ", code: [" + executeCode + "]") val startupMap = new java.util.HashMap[String, Any]() startupMap.put("wds.linkis.yarnqueue", "default") //Startup parameter configuration val jobExecuteResult = client.execute(JobExecuteAction.builder() .setCreator("LinkisClient-Test") //creator, requesting the system name of the Linkis client, used for system-level isolation .addExecuteCode(executeCode) //ExecutionCode The code to be executed .setEngineType(EngineType.SPARK) // The execution engine type of Linkis that you want to request, such as Spark hive, etc. .setStartupParams(startupMap) .setUser(user).build()) //User, request user; used for user-level multi-tenant isolation println("execId: "+ jobExecuteResult.getExecID + ", taskId:" + jobExecuteResult.taskID)
// 4. Get the execution status of the script var jobInfoResult = client.getJobInfo(jobExecuteResult) val sleepTimeMills: Int = 1000 while (!jobInfoResult.isCompleted) { // 5. Get the execution progress of the script val progress = client.progress(jobExecuteResult) val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty println("progress: "+ progress.getProgress + ", progressInfo:" + progressInfo) Utils.sleepQuietly(sleepTimeMills) jobInfoResult = client.getJobInfo(jobExecuteResult) } if (!jobInfoResult.isSucceed) { println("Failed to execute job: "+ jobInfoResult.getMessage) throw new Exception(jobInfoResult.getMessage) }
// 6. Get the job information of the script val jobInfo = client.getJobInfo(jobExecuteResult) // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated) val resultSetList = jobInfoResult.getResultSetList(client) println("All result set list:") resultSetList.foreach(println) val oneResultSet = jobInfo.getResultSetList(client).head // 8. Get a specific result set through a result set information val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent println("First fileContents: ") println(fileContents) } catch { case e: Exception => { e.printStackTrace() } } IOUtils.closeQuietly(client)}3 1.0 New Submit Method#
1.0 adds the client.submit method, which is used to interface with the new task execution interface of 1.0, and supports the input of Label and other parameters
3.1 Java Test Class#
package com.webank.wedatasphere.linkis.client.test;
import com.webank.wedatasphere.linkis.common.utils.Utils;import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant;import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant;import com.webank.wedatasphere.linkis.ujes.client.UJESClient;import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction;import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;import org.apache.commons.io.IOUtils;
import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;
public class JavaClientTest {
public static void main(String[] args){
String user = "hadoop"; String executeCode = "show tables";
// 1. Configure ClientBuilder and get ClientConfig DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder() .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the linkis server-side gateway, such as http://{ip}:{port} .connectionTimeout(30000) //connectionTimeOut client connection timeout .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name .setDWSVersion("v1").build(); //Linkis background protocol version, the current version is v1
// 2. Get a UJESClient through DWSClientConfig UJESClient client = new UJESClientImpl(clientConfig);
try { // 3. Start code execution System.out.println("user: "+ user + ", code: [" + executeCode + "]"); Map<String, Object> startupMap = new HashMap<String, Object>(); // Various startup parameters can be stored in startupMap, see linkis management console configuration startupMap.put("wds.linkis.yarnqueue", "q02"); //Specify Label Map<String, Object> labels = new HashMap<String, Object>(); //Add the label that this execution depends on: EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1"); labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE"); labels.put(LabelKeyConstant.ENGINE_RUN_TYPE_KEY, "hql"); //Specify source Map<String, Object> source = new HashMap<String, Object>(); source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test"); JobExecuteResult jobExecuteResult = client.submit( JobSubmitAction.builder() .addExecuteCode(executeCode) .setStartupParams(startupMap) .setUser(user)//Job submit user .addExecuteUser(user)//The actual execution user .setLabels(labels) .setSource(source) .build() ); System.out.println("execId: "+ jobExecuteResult.getExecID() + ", taskId:" + jobExecuteResult.taskID());
// 4. Get the execution status of the script JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult); int sleepTimeMills = 1000; while(!jobInfoResult.isCompleted()) { // 5. Get the execution progress of the script JobProgressResult progress = client.progress(jobExecuteResult); Utils.sleepQuietly(sleepTimeMills); jobInfoResult = client.getJobInfo(jobExecuteResult); }
// 6. Get the job information of the script JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult); // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated) String resultSet = jobInfo.getResultSetList(client)[0]; // 8. Get a specific result set through a result set information Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent(); System.out.println("fileContents: "+ fileContents);
} catch (Exception e) { e.printStackTrace(); IOUtils.closeQuietly(client); } IOUtils.closeQuietly(client); }}
3.2 Scala Test Class#
package com.webank.wedatasphere.linkis.client.test
import java.utilimport java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.Utilsimport com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategyimport com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilderimport com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstantimport com.webank.wedatasphere.linkis.protocol.constants.TaskConstantimport com.webank.wedatasphere.linkis.ujes.client.UJESClientimport com.webank.wedatasphere.linkis.ujes.client.request.{JobSubmitAction, ResultSetAction}import org.apache.commons.io.IOUtils
object ScalaClientTest {
def main(args: Array[String]): Unit = { val executeCode = "show tables" val user = "hadoop"
// 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder val clientConfig = DWSClientConfigBuilder.newBuilder() .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the Linkis server-side gateway, such as http://{ip}:{port} .connectionTimeout(30000) //connectionTimeOut client connection timeout .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name .setDWSVersion("v1").build() //Linkis backend protocol version, the current version is v1
// 2. Get a UJESClient through DWSClientConfig val client = UJESClient(clientConfig)
try { // 3. Start code execution println("user: "+ user + ", code: [" + executeCode + "]") val startupMap = new java.util.HashMap[String, Any]() startupMap.put("wds.linkis.yarnqueue", "q02") //Startup parameter configuration //Specify Label val labels: util.Map[String, Any] = new util.HashMap[String, Any] //Add the label that this execution depends on, such as engineLabel labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1") labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE") labels.put(LabelKeyConstant.ENGINE_RUN_TYPE_KEY, "hql") //Specify source val source: util.Map[String, Any] = new util.HashMap[String, Any] source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test") val jobExecuteResult = client.submit(JobSubmitAction.builder .addExecuteCode(executeCode) .setStartupParams(startupMap) .setUser(user) //Job submit user .addExecuteUser(user) //The actual execution user .setLabels(labels) .setSource(source) .build) //User, requesting user; used for user-level multi-tenant isolation println("execId: "+ jobExecuteResult.getExecID + ", taskId:" + jobExecuteResult.taskID)
// 4. Get the execution status of the script var jobInfoResult = client.getJobInfo(jobExecuteResult) val sleepTimeMills: Int = 1000 while (!jobInfoResult.isCompleted) { // 5. Get the execution progress of the script val progress = client.progress(jobExecuteResult) val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty println("progress: "+ progress.getProgress + ", progressInfo:" + progressInfo) Utils.sleepQuietly(sleepTimeMills) jobInfoResult = client.getJobInfo(jobExecuteResult) } if (!jobInfoResult.isSucceed) { println("Failed to execute job: "+ jobInfoResult.getMessage) throw new Exception(jobInfoResult.getMessage) }
// 6. Get the job information of the script val jobInfo = client.getJobInfo(jobExecuteResult) // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated) val resultSetList = jobInfoResult.getResultSetList(client) println("All result set list:") resultSetList.foreach(println) val oneResultSet = jobInfo.getResultSetList(client).head // 8. Get a specific result set through a result set information val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent println("First fileContents: ") println(fileContents) } catch { case e: Exception => { e.printStackTrace() } } IOUtils.closeQuietly(client) }
}