Skip to main content
Version: 1.2.0

Sqoop Engine usage documentation

This article mainly introduces the configuration, deployment and use of the Sqoop engine in Linkis1.X.

1.Sqoop engine Linkis system parameter configuration#

The Sqoop engine mainly depends on the Hadoop basic environment. If the node needs to deploy the Sqoop engine, the Hadoop client environment needs to be deployed.

It is strongly recommended that you use the native Sqoop to execute the test task on the node before executing the Sqoop task to check whether the node environment is normal.

Environment Variable NameEnvironment Variable ContentRemark
JAVA_HOMEJDK installation pathRequired
HADOOP_HOMEHadoop installation pathRequired
HADOOP_CONF_DIRHadoop installation pathRequired
SQOOP_HOMESqoop installation pathNot Required
SQOOP_CONF_DIRSqoop config pathNot Required
HCAT_HOMEHCAT config pathNot Required
HBASE_HOMEHBASE config pathNot Required

表1-1 环境配置清单

Linkis Parameter NameParameter ContentRemark
wds.linkis.hadoop.site.xmlSet sqoop to load hadoop parameter file locationRequired,Reference example:"/etc/hadoop/conf/core-site.xml;/etc/hadoop/conf/hdfs-site.xml;/etc/hadoop/conf/yarn-site.xml;/etc/hadoop/conf/mapred-site.xml"
sqoop.fetch.status.intervalSet the interval time for obtaining sqoop execution statusNot required, the default value is 5s

2.Sqoop Engine configuration and deployment#

2.1 Sqoop Version selection and compilation#

Mainstream Sqoop versions 1.4.6 and 1.4.7 supported by Linkis 1.1.2 and above, and later versions may need to modify some code and recompile

2.2 Sqoop engineConn deploy and load#

Note: Before compiling the sqoop engine, the linkis project needs to be fully compiled

Compile sqoop separately:${linkis_code_dir}linkis-engineconn-plugins/sqoop/mvn clean install

The installation method is to compile the compiled engine package, located in

${linkis_code_dir}linkis-engineconn-plugins/sqoop/target/sqoop-engineconn.zip

and then deploy to

${LINKIS_HOME}/lib/linkis-engineplugins

and restart linkis-engineplugin

cd ${LINKIS_HOME}/sbinsh linkis-daemon.sh restart cg-engineplugin

More engineplugin details can be found in the following article.
https://linkis.apache.org/zh-CN/docs/1.1.1/deployment/engine-conn-plugin-installation

3.Sqoop Engine Usage#

3.1 OnceEngineConn#

OnceEngineConn is used by calling LinkisManager's createEngineConn interface through LinkisManagerClient, and sending the code to the created Sqoop engine, and then the Sqoop engine starts to execute. This method can be called by other systems, such as Exchange. The use of Client is also very simple, first create a new maven project, or introduce the following dependencies into your project

<dependency>    <groupId>org.apache.linkis</groupId>    <artifactId>linkis-computation-client</artifactId>    <version>${linkis.version}</version></dependency>

Test Case:


package com.webank.wedatasphere.exchangis.job.server.log.client
import java.util.concurrent.TimeUnit
import java.util
import org.apache.linkis.computation.client.LinkisJobBuilderimport org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}import org.apache.linkis.computation.client.utils.LabelKeyUtils
import scala.collection.JavaConverters._
object SqoopOnceJobTest extends App {  LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")  val logPath = "C:\\Users\\resources\\log4j.properties"  System.setProperty("log4j.configurationFile", logPath)  val startUpMap = new util.HashMap[String, Any]  startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")   val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")     .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")     .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")     .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")     .setStartupParams(startUpMap)     .setMaxSubmitTime(30000)     .addExecuteUser("freeuser")  val onceJob = importJob(builder)  val time = System.currentTimeMillis()  onceJob.submit()  println(onceJob.getId)  val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]  println(onceJob.getECMServiceInstance)  logOperator.setFromLine(0)  logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)  logOperator.setEngineConnType("sqoop")  logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")  var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]  var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]  var end = false  var rowBefore = 1  while (!end || rowBefore > 0){       if(onceJob.isCompleted) {         end = true         metricOperator = null       }      logOperator.setPageSize(100)      Utils.tryQuietly{        val logs = logOperator.apply()        logs.logs.asScala.foreach( log => {          println(log)        })        rowBefore = logs.logs.size    }    Thread.sleep(3000)    Option(metricOperator).foreach( operator => {      if (!onceJob.isCompleted){        println(s"Metric Monitor: ${operator.apply()}")        println(s"Progress: ${progressOperator.apply()}")      }    })  }  onceJob.isCompleted  onceJob.waitForCompleted()  println(onceJob.getStatus)  println(TimeUnit.SECONDS.convert(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS) + "s")  System.exit(0)

   def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {     jobBuilder       .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")       .addJobContent("sqoop.mode", "import")       .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")       .addJobContent("sqoop.args.username", "free")       .addJobContent("sqoop.args.password", "testpwd")       .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +         " exchangis where sno =1 and $CONDITIONS")       .addJobContent("sqoop.args.hcatalog.database", "freedb")       .addJobContent("sqoop.args.hcatalog.table", "zy_test")       .addJobContent("sqoop.args.hcatalog.partition.keys", "month")       .addJobContent("sqoop.args.hcatalog.partition.values", "3")       .addJobContent("sqoop.args.num.mappers", "1")       .build()   }
   def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {      jobBuilder        .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")        .addJobContent("sqoop.mode", "import")        .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")        .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +          " exchangis_table where sno =1 and $CONDITIONS")        .addJobContent("sqoop.args.hcatalog.database", "hadoop")        .addJobContent("sqoop.args.hcatalog.table", "partition_33")        .addJobContent("sqoop.args.hcatalog.partition.keys", "month")        .addJobContent("sqoop.args.hcatalog.partition.values", "4")        .addJobContent("sqoop.args.num.mappers", "1")        .build()   }

Parameter Comparison table (with native parameters):**

sqoop.env.mapreduce.job.queuename<=>-Dmapreduce.job.queuenamesqoop.args.connection.manager<===>--connection-managersqoop.args.connection.param.file<===>--connection-param-filesqoop.args.driver<===>--driversqoop.args.hadoop.home<===>--hadoop-homesqoop.args.hadoop.mapred.home<===>--hadoop-mapred-homesqoop.args.help<===>helpsqoop.args.password<===>--passwordsqoop.args.password.alias<===>--password-aliassqoop.args.password.file<===>--password-filesqoop.args.relaxed.isolation<===>--relaxed-isolationsqoop.args.skip.dist.cache<===>--skip-dist-cachesqoop.args.username<===>--usernamesqoop.args.verbose<===>--verbosesqoop.args.append<===>--appendsqoop.args.as.avrodatafile<===>--as-avrodatafilesqoop.args.as.parquetfile<===>--as-parquetfilesqoop.args.as.sequencefile<===>--as-sequencefilesqoop.args.as.textfile<===>--as-textfilesqoop.args.autoreset.to.one.mapper<===>--autoreset-to-one-mappersqoop.args.boundary.query<===>--boundary-querysqoop.args.case.insensitive<===>--case-insensitivesqoop.args.columns<===>--columnssqoop.args.compression.codec<===>--compression-codecsqoop.args.delete.target.dir<===>--delete-target-dirsqoop.args.direct<===>--directsqoop.args.direct.split.size<===>--direct-split-sizesqoop.args.query<===>--querysqoop.args.fetch.size<===>--fetch-sizesqoop.args.inline.lob.limit<===>--inline-lob-limitsqoop.args.num.mappers<===>--num-mapperssqoop.args.mapreduce.job.name<===>--mapreduce-job-namesqoop.args.merge.key<===>--merge-keysqoop.args.split.by<===>--split-bysqoop.args.table<===>--tablesqoop.args.target.dir<===>--target-dirsqoop.args.validate<===>--validatesqoop.args.validation.failurehandler<===>--validation-failurehandlersqoop.args.validation.threshold<===> --validation-thresholdsqoop.args.validator<===>--validatorsqoop.args.warehouse.dir<===>--warehouse-dirsqoop.args.where<===>--wheresqoop.args.compress<===>--compresssqoop.args.check.column<===>--check-columnsqoop.args.incremental<===>--incrementalsqoop.args.last.value<===>--last-valuesqoop.args.enclosed.by<===>--enclosed-bysqoop.args.escaped.by<===>--escaped-bysqoop.args.fields.terminated.by<===>--fields-terminated-bysqoop.args.lines.terminated.by<===>--lines-terminated-bysqoop.args.mysql.delimiters<===>--mysql-delimiterssqoop.args.optionally.enclosed.by<===>--optionally-enclosed-bysqoop.args.input.enclosed.by<===>--input-enclosed-bysqoop.args.input.escaped.by<===>--input-escaped-bysqoop.args.input.fields.terminated.by<===>--input-fields-terminated-bysqoop.args.input.lines.terminated.by<===>--input-lines-terminated-bysqoop.args.input.optionally.enclosed.by<===>--input-optionally-enclosed-bysqoop.args.create.hive.table<===>--create-hive-tablesqoop.args.hive.delims.replacement<===>--hive-delims-replacementsqoop.args.hive.database<===>--hive-databasesqoop.args.hive.drop.import.delims<===>--hive-drop-import-delimssqoop.args.hive.home<===>--hive-homesqoop.args.hive.import<===>--hive-importsqoop.args.hive.overwrite<===>--hive-overwritesqoop.args.hive.partition.value<===>--hive-partition-valuesqoop.args.hive.table<===>--hive-tablesqoop.args.column.family<===>--column-familysqoop.args.hbase.bulkload<===>--hbase-bulkloadsqoop.args.hbase.create.table<===>--hbase-create-tablesqoop.args.hbase.row.key<===>--hbase-row-keysqoop.args.hbase.table<===>--hbase-tablesqoop.args.hcatalog.database<===>--hcatalog-databasesqoop.args.hcatalog.home<===>--hcatalog-homesqoop.args.hcatalog.partition.keys<===>--hcatalog-partition-keyssqoop.args.hcatalog.partition.values<===>--hcatalog-partition-valuessqoop.args.hcatalog.table<===>--hcatalog-tablesqoop.args.hive.partition.key<===>--hive-partition-keysqoop.args.map.column.hive<===>--map-column-hivesqoop.args.create.hcatalog.table<===>--create-hcatalog-tablesqoop.args.hcatalog.storage.stanza<===>--hcatalog-storage-stanzasqoop.args.accumulo.batch.size<===>--accumulo-batch-sizesqoop.args.accumulo.column.family<===>--accumulo-column-familysqoop.args.accumulo.create.table<===>--accumulo-create-tablesqoop.args.accumulo.instance<===>--accumulo-instancesqoop.args.accumulo.max.latency<===>--accumulo-max-latencysqoop.args.accumulo.password<===>--accumulo-passwordsqoop.args.accumulo.row.key<===>--accumulo-row-keysqoop.args.accumulo.table<===>--accumulo-tablesqoop.args.accumulo.user<===>--accumulo-usersqoop.args.accumulo.visibility<===>--accumulo-visibilitysqoop.args.accumulo.zookeepers<===>--accumulo-zookeeperssqoop.args.bindir<===>--bindirsqoop.args.class.name<===>--class-namesqoop.args.input.null.non.string<===>--input-null-non-stringsqoop.args.input.null.string<===>--input-null-stringsqoop.args.jar.file<===>--jar-filesqoop.args.map.column.java<===>--map-column-javasqoop.args.null.non.string<===>--null-non-stringsqoop.args.null.string<===>--null-stringsqoop.args.outdir<===>--outdirsqoop.args.package.name<===>--package-namesqoop.args.conf<===>-confsqoop.args.D<===>-Dsqoop.args.fs<===>-fssqoop.args.jt<===>-jtsqoop.args.files<===>-filessqoop.args.libjars<===>-libjarssqoop.args.archives<===>-archivessqoop.args.update.key<===>--update-keysqoop.args.update.mode<===>--update-modesqoop.args.export.dir<===>--export-dir