Skip to main content
Version: 0.11.0

Use of 0.X SDK

Linkis provides a convenient interface for calling JAVA and SCALA, which can be used only by introducing the ujes-client module

1 Introduce dependent modules#

<dependency>  <groupId>com.webank.wedatasphere.linkis</groupId>  <artifactId>linkis-ujes-client</artifactId>  <version>0.9.3</version></dependency>

2 Java test code#

Establish the Java test class UJESClientImplTestJ, the specific interface meaning can be seen in the notes:

package com.wedatasphere.linkis.ujes.test;
import com.webank.wedatasphere.linkis.common.utils.Utils;import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;import com.webank.wedatasphere.linkis.ujes.client.UJESClient;import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;import org.apache.commons.io.IOUtils;
import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;
public class UJESClientTest {
    public static void main(String[] args){
        String user = "hadoop";        String executeCode = "show databases;";
        // 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder        DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()                .addUJESServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the linkis server-side gateway, such as http://{ip}:{port}                .connectionTimeout(30000) //connectionTimeOut client connection timeout                .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered                .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless                .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent                .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry                .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method                .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name                .setDWSVersion("v1").build(); //Linkis background protocol version, the current version is v1
        // 2. Get a UJESClient through DWSClientConfig        UJESClient client = new UJESClientImpl(clientConfig);
        try {            // 3. Start code execution            System.out.println("user: "+ user + ", code: [" + executeCode + "]");            Map<String, Object> startupMap = new HashMap<String, Object>();            startupMap.put("wds.linkis.yarnqueue", "default"); // Various startup parameters can be stored in startupMap, see linkis management console configuration            JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder()                    .setCreator("linkisClient-Test") //creator, the system name of the client requesting linkis, used for system-level isolation                    .addExecuteCode(executeCode) //ExecutionCode The code to be executed                    .setEngineType((JobExecuteAction.EngineType) JobExecuteAction.EngineType$.MODULE$.HIVE()) // The execution engine type of the linkis that you want to request, such as Spark hive, etc.                    .setUser(user) //User, request user; used for user-level multi-tenant isolation                    .setStartupParams(startupMap)                    .build());            System.out.println("execId: "+ jobExecuteResult.getExecID() + ", taskId:" + jobExecuteResult.taskID());
            // 4. Get the execution status of the script            JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);            int sleepTimeMills = 1000;            while(!jobInfoResult.isCompleted()) {                // 5. Get the execution progress of the script                JobProgressResult progress = client.progress(jobExecuteResult);                Utils.sleepQuietly(sleepTimeMills);                jobInfoResult = client.getJobInfo(jobExecuteResult);            }
            // 6. Get the job information of the script            JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);            // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)            String resultSet = jobInfo.getResultSetList(client)[0];            // 8. Get a specific result set through a result set information            Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();            System.out.println("fileContents: "+ fileContents);
        } catch (Exception e) {            e.printStackTrace();            IOUtils.closeQuietly(client);        }        IOUtils.closeQuietly(client);    }}

Run the above code to interact with Linkis

3 Scala test code#

package com.wedatasphere.linkis.ujes.test
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.Utilsimport com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategyimport com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilderimport com.webank.wedatasphere.linkis.ujes.client.UJESClientimport com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.EngineTypeimport com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}import org.apache.commons.io.IOUtils
object UJESClientImplTest extends App {
  var executeCode = "show databases;"  var user = "hadoop"
  // 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder  val clientConfig = DWSClientConfigBuilder.newBuilder()    .addUJESServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the Linkis server-side gateway, such as http://{ip}:{port}    .connectionTimeout(30000) //connectionTimeOut client connection timeout    .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered    .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless    .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent    .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry    .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method    .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name    .setDWSVersion("v1").build() //Linkis backend protocol version, the current version is v1
  // 2. Get a UJESClient through DWSClientConfig  val client = UJESClient(clientConfig)    try {    // 3. Start code execution    println("user: "+ user + ", code: [" + executeCode + "]")    val startupMap = new java.util.HashMap[String, Any]()    startupMap.put("wds.linkis.yarnqueue", "default") //Startup parameter configuration    val jobExecuteResult = client.execute(JobExecuteAction.builder()      .setCreator("LinkisClient-Test") //creator, requesting the system name of the Linkis client, used for system-level isolation      .addExecuteCode(executeCode) //ExecutionCode The code to be executed      .setEngineType(EngineType.SPARK) // The execution engine type of Linkis that you want to request, such as Spark hive, etc.      .setStartupParams(startupMap)      .setUser(user).build()) //User, request user; used for user-level multi-tenant isolation    println("execId: "+ jobExecuteResult.getExecID + ", taskId:" + jobExecuteResult.taskID)        // 4. Get the execution status of the script    var jobInfoResult = client.getJobInfo(jobExecuteResult)    val sleepTimeMills: Int = 1000    while (!jobInfoResult.isCompleted) {      // 5. Get the execution progress of the script      val progress = client.progress(jobExecuteResult)      val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty      println("progress: "+ progress.getProgress + ", progressInfo:" + progressInfo)      Utils.sleepQuietly(sleepTimeMills)      jobInfoResult = client.getJobInfo(jobExecuteResult)    }    if (!jobInfoResult.isSucceed) {      println("Failed to execute job: "+ jobInfoResult.getMessage)      throw new Exception(jobInfoResult.getMessage)    }
    // 6. Get the job information of the script    val jobInfo = client.getJobInfo(jobExecuteResult)    // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)    val resultSetList = jobInfoResult.getResultSetList(client)    println("All result set list:")    resultSetList.foreach(println)    val oneResultSet = jobInfo.getResultSetList(client).head    // 8. Get a specific result set through a result set information    val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent    println("First fileContents: ")    println(fileContents)  } catch {    case e: Exception => {      e.printStackTrace()    }  }  IOUtils.closeQuietly(client)}