Sqoop Engine usage documentation
This article mainly introduces the configuration, deployment and use of the Sqoop engine in Linkis1.X.
1.Sqoop engine Linkis system parameter configuration#
The Sqoop engine mainly depends on the Hadoop basic environment. If the node needs to deploy the Sqoop engine, the Hadoop client environment needs to be deployed.
It is strongly recommended that you use the native Sqoop to execute the test task on the node before executing the Sqoop task to check whether the node environment is normal.
| Environment Variable Name | Environment Variable Content | Remark |
|---|---|---|
| JAVA_HOME | JDK installation path | Required |
| HADOOP_HOME | Hadoop installation path | Required |
| HADOOP_CONF_DIR | Hadoop installation path | Required |
| SQOOP_HOME | Sqoop installation path | Not Required |
| SQOOP_CONF_DIR | Sqoop config path | Not Required |
| HCAT_HOME | HCAT config path | Not Required |
| HBASE_HOME | HBASE config path | Not Required |
表1-1 环境配置清单
| Linkis Parameter Name | Parameter Content | Remark |
|---|---|---|
| wds.linkis.hadoop.site.xml | Set sqoop to load hadoop parameter file location | Required,Reference example:"/etc/hadoop/conf/core-site.xml;/etc/hadoop/conf/hdfs-site.xml;/etc/hadoop/conf/yarn-site.xml;/etc/hadoop/conf/mapred-site.xml" |
| sqoop.fetch.status.interval | Set the interval time for obtaining sqoop execution status | Not required, the default value is 5s |
2.Sqoop Engine configuration and deployment#
2.1 Sqoop Version selection and compilation#
Mainstream Sqoop versions 1.4.6 and 1.4.7 supported by Linkis 1.1.2 and above, and later versions may need to modify some code and recompile
2.2 Sqoop engineConn deploy and load#
Note: Before compiling the sqoop engine, the linkis project needs to be fully compiled
Compile sqoop separately:${linkis_code_dir}linkis-engineconn-plugins/sqoop/mvn clean installThe installation method is to compile the compiled engine package, located in
${linkis_code_dir}linkis-engineconn-plugins/sqoop/target/sqoop-engineconn.zipand then deploy to
${LINKIS_HOME}/lib/linkis-enginepluginsand restart linkis-engineplugin
cd ${LINKIS_HOME}/sbinsh linkis-daemon.sh restart cg-enginepluginMore engineplugin details can be found in the following article EngineConnPlugin Installation
3.Sqoop Engine Usage#
3.1 OnceEngineConn#
OnceEngineConn is used by calling LinkisManager's createEngineConn interface through LinkisManagerClient, and sending the code to the created Sqoop engine, and then the Sqoop engine starts to execute. This method can be called by other systems, such as Exchange. The use of Client is also very simple, first create a new maven project, or introduce the following dependencies into your project
<dependency> <groupId>org.apache.linkis</groupId> <artifactId>linkis-computation-client</artifactId> <version>${linkis.version}</version></dependency>Test Case:
package com.webank.wedatasphere.exchangis.job.server.log.client
import java.util.concurrent.TimeUnit
import java.util
import org.apache.linkis.computation.client.LinkisJobBuilderimport org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}import org.apache.linkis.computation.client.utils.LabelKeyUtils
import scala.collection.JavaConverters._
object SqoopOnceJobTest extends App { LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001") val logPath = "C:\\Users\\resources\\log4j.properties" System.setProperty("log4j.configurationFile", logPath) val startUpMap = new util.HashMap[String, Any] startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g") val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client") .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6") .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client") .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once") .setStartupParams(startUpMap) .setMaxSubmitTime(30000) .addExecuteUser("freeuser") val onceJob = importJob(builder) val time = System.currentTimeMillis() onceJob.submit() println(onceJob.getId) val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator] println(onceJob.getECMServiceInstance) logOperator.setFromLine(0) logOperator.setECMServiceInstance(onceJob.getECMServiceInstance) logOperator.setEngineConnType("sqoop") logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]") var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator] var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator] var end = false var rowBefore = 1 while (!end || rowBefore > 0){ if(onceJob.isCompleted) { end = true metricOperator = null } logOperator.setPageSize(100) Utils.tryQuietly{ val logs = logOperator.apply() logs.logs.asScala.foreach( log => { println(log) }) rowBefore = logs.logs.size } Thread.sleep(3000) Option(metricOperator).foreach( operator => { if (!onceJob.isCompleted){ println(s"Metric Monitor: ${operator.apply()}") println(s"Progress: ${progressOperator.apply()}") } }) } onceJob.isCompleted onceJob.waitForCompleted() println(onceJob.getStatus) println(TimeUnit.SECONDS.convert(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS) + "s") System.exit(0)
def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = { jobBuilder .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10") .addJobContent("sqoop.mode", "import") .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis") .addJobContent("sqoop.args.username", "free") .addJobContent("sqoop.args.password", "testpwd") .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" + " exchangis where sno =1 and $CONDITIONS") .addJobContent("sqoop.args.hcatalog.database", "freedb") .addJobContent("sqoop.args.hcatalog.table", "zy_test") .addJobContent("sqoop.args.hcatalog.partition.keys", "month") .addJobContent("sqoop.args.hcatalog.partition.values", "3") .addJobContent("sqoop.args.num.mappers", "1") .build() }
def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = { jobBuilder .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1") .addJobContent("sqoop.mode", "import") .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis") .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" + " exchangis_table where sno =1 and $CONDITIONS") .addJobContent("sqoop.args.hcatalog.database", "hadoop") .addJobContent("sqoop.args.hcatalog.table", "partition_33") .addJobContent("sqoop.args.hcatalog.partition.keys", "month") .addJobContent("sqoop.args.hcatalog.partition.values", "4") .addJobContent("sqoop.args.num.mappers", "1") .build() }Parameter Comparison table (with native parameters):**
sqoop.env.mapreduce.job.queuename<=>-Dmapreduce.job.queuenamesqoop.args.connection.manager<===>--connection-managersqoop.args.connection.param.file<===>--connection-param-filesqoop.args.driver<===>--driversqoop.args.hadoop.home<===>--hadoop-homesqoop.args.hadoop.mapred.home<===>--hadoop-mapred-homesqoop.args.help<===>helpsqoop.args.password<===>--passwordsqoop.args.password.alias<===>--password-aliassqoop.args.password.file<===>--password-filesqoop.args.relaxed.isolation<===>--relaxed-isolationsqoop.args.skip.dist.cache<===>--skip-dist-cachesqoop.args.username<===>--usernamesqoop.args.verbose<===>--verbosesqoop.args.append<===>--appendsqoop.args.as.avrodatafile<===>--as-avrodatafilesqoop.args.as.parquetfile<===>--as-parquetfilesqoop.args.as.sequencefile<===>--as-sequencefilesqoop.args.as.textfile<===>--as-textfilesqoop.args.autoreset.to.one.mapper<===>--autoreset-to-one-mappersqoop.args.boundary.query<===>--boundary-querysqoop.args.case.insensitive<===>--case-insensitivesqoop.args.columns<===>--columnssqoop.args.compression.codec<===>--compression-codecsqoop.args.delete.target.dir<===>--delete-target-dirsqoop.args.direct<===>--directsqoop.args.direct.split.size<===>--direct-split-sizesqoop.args.query<===>--querysqoop.args.fetch.size<===>--fetch-sizesqoop.args.inline.lob.limit<===>--inline-lob-limitsqoop.args.num.mappers<===>--num-mapperssqoop.args.mapreduce.job.name<===>--mapreduce-job-namesqoop.args.merge.key<===>--merge-keysqoop.args.split.by<===>--split-bysqoop.args.table<===>--tablesqoop.args.target.dir<===>--target-dirsqoop.args.validate<===>--validatesqoop.args.validation.failurehandler<===>--validation-failurehandlersqoop.args.validation.threshold<===> --validation-thresholdsqoop.args.validator<===>--validatorsqoop.args.warehouse.dir<===>--warehouse-dirsqoop.args.where<===>--wheresqoop.args.compress<===>--compresssqoop.args.check.column<===>--check-columnsqoop.args.incremental<===>--incrementalsqoop.args.last.value<===>--last-valuesqoop.args.enclosed.by<===>--enclosed-bysqoop.args.escaped.by<===>--escaped-bysqoop.args.fields.terminated.by<===>--fields-terminated-bysqoop.args.lines.terminated.by<===>--lines-terminated-bysqoop.args.mysql.delimiters<===>--mysql-delimiterssqoop.args.optionally.enclosed.by<===>--optionally-enclosed-bysqoop.args.input.enclosed.by<===>--input-enclosed-bysqoop.args.input.escaped.by<===>--input-escaped-bysqoop.args.input.fields.terminated.by<===>--input-fields-terminated-bysqoop.args.input.lines.terminated.by<===>--input-lines-terminated-bysqoop.args.input.optionally.enclosed.by<===>--input-optionally-enclosed-bysqoop.args.create.hive.table<===>--create-hive-tablesqoop.args.hive.delims.replacement<===>--hive-delims-replacementsqoop.args.hive.database<===>--hive-databasesqoop.args.hive.drop.import.delims<===>--hive-drop-import-delimssqoop.args.hive.home<===>--hive-homesqoop.args.hive.import<===>--hive-importsqoop.args.hive.overwrite<===>--hive-overwritesqoop.args.hive.partition.value<===>--hive-partition-valuesqoop.args.hive.table<===>--hive-tablesqoop.args.column.family<===>--column-familysqoop.args.hbase.bulkload<===>--hbase-bulkloadsqoop.args.hbase.create.table<===>--hbase-create-tablesqoop.args.hbase.row.key<===>--hbase-row-keysqoop.args.hbase.table<===>--hbase-tablesqoop.args.hcatalog.database<===>--hcatalog-databasesqoop.args.hcatalog.home<===>--hcatalog-homesqoop.args.hcatalog.partition.keys<===>--hcatalog-partition-keyssqoop.args.hcatalog.partition.values<===>--hcatalog-partition-valuessqoop.args.hcatalog.table<===>--hcatalog-tablesqoop.args.hive.partition.key<===>--hive-partition-keysqoop.args.map.column.hive<===>--map-column-hivesqoop.args.create.hcatalog.table<===>--create-hcatalog-tablesqoop.args.hcatalog.storage.stanza<===>--hcatalog-storage-stanzasqoop.args.accumulo.batch.size<===>--accumulo-batch-sizesqoop.args.accumulo.column.family<===>--accumulo-column-familysqoop.args.accumulo.create.table<===>--accumulo-create-tablesqoop.args.accumulo.instance<===>--accumulo-instancesqoop.args.accumulo.max.latency<===>--accumulo-max-latencysqoop.args.accumulo.password<===>--accumulo-passwordsqoop.args.accumulo.row.key<===>--accumulo-row-keysqoop.args.accumulo.table<===>--accumulo-tablesqoop.args.accumulo.user<===>--accumulo-usersqoop.args.accumulo.visibility<===>--accumulo-visibilitysqoop.args.accumulo.zookeepers<===>--accumulo-zookeeperssqoop.args.bindir<===>--bindirsqoop.args.class.name<===>--class-namesqoop.args.input.null.non.string<===>--input-null-non-stringsqoop.args.input.null.string<===>--input-null-stringsqoop.args.jar.file<===>--jar-filesqoop.args.map.column.java<===>--map-column-javasqoop.args.null.non.string<===>--null-non-stringsqoop.args.null.string<===>--null-stringsqoop.args.outdir<===>--outdirsqoop.args.package.name<===>--package-namesqoop.args.conf<===>-confsqoop.args.D<===>-Dsqoop.args.fs<===>-fssqoop.args.jt<===>-jtsqoop.args.files<===>-filessqoop.args.libjars<===>-libjarssqoop.args.archives<===>-archivessqoop.args.update.key<===>--update-keysqoop.args.update.mode<===>--update-modesqoop.args.export.dir<===>--export-dir