赞
踩
原作:https://blog.csdn.net/weixin_42003671/article/details/97630151
原作很详细了但是基本都是test阶段的东西,也踩了很多坑才走出来
这边尽力写出自己遇到的问题
先贴代码
**
**
import java.sql.{Connection, DriverManager, ResultSet} import java.util.Properties import scala.collection.mutable import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.{SparkContext, SparkConf} object MysqlUtil { // val url = "jdbc:mysql://rm-mariadb-prod.spmore.com:3309/data_server" // val user = "sps_wzy" // val password = "wzy_rootr" // val props = new Properties() // props.put("user", user) // props.put("password", password) // props.setProperty("useSSL", "false") // props.setProperty("useUnicode", "true") // props.setProperty("characterEncoding", "utf8") // var connection: Connection = null import java.sql.Connection val url = "jdbc:postgresql://hgmc-cn-zvp27cmdw002-cn-beijing.hologres.aliyuncs.com:80/po_total?user=LTAI5qwr125wqer764sdf3HfwXw3JN&password=12095asdqwrqeweqwqwrtqweqw4712qwqte0941" val conn = DriverManager.getConnection(url) val st = conn.createStatement var connection: Connection = null //这里是连接阿里云hologre外部表的jdbc import java.sql.Connection import java.sql.DriverManager import java.sql.SQLException val accessId = "LTA1eqw125BxfBg5CHfwXw3JN" val accessKey = "OqF8np0uH请问安发桥委屈DKCD3b40vkCY" val MaxUrl = "jdbc:odps:http://service.cn-nanjing.maxcompute.aliyun.com/api?project=wzy_bbbb" val MaxConn = DriverManager.getConnection(MaxUrl,accessId,accessKey) val MaxStatement = MaxConn.createStatement var MaxConnection: Connection = null //这里是连接阿里云maxcomputer的jdbc /** * 定义查询操作,这边和原作有区别的地方在 * var resMap = mutable.ListBuffer[Any]() * 原作是var resMap = mutable.Map[String,String]() * 因为需要获取到的数据不再是原作中简单的两个字符串,而是数据库里面各种类型的数据 所以用了mutable的listbuffer 然后又类型改成any就行 * 其余都是基本的sql语句 没有太难的地方 有疑惑也可以一起交流 */ def AmPage(po_status : String, new_sps_no : String,pageSize : Int) ={ var resMap = mutable.ListBuffer[Any]() try { classOf[com.mysql.jdbc.Driver] connection = DriverManager.getConnection(url) val sql = s""" |select |po_status, |po_date, |new_sps_no, |buyer_ref_no, |equipment_en, |buyer_company_name, |buyeram, |buyerbd, |outside_total_amount, |po_declaration_remarks from data_server.dws_am_po_total |where po_status = "$po_status" and new_sps_no='$new_sps_no' limit '$pageSize' """.stripMargin val resSet: ResultSet = connection.createStatement().executeQuery(sql) while(resSet.next()){ val po_status: String = resSet.getString("po_status") val po_date: String = resSet.getString("po_date") val new_sps_no: String = resSet.getString("new_sps_no") val buyer_ref_no: String = resSet.getString("buyer_ref_no") val equipment_en: String = resSet.getString("equipment_en") val buyer_company_name: String = resSet.getString("buyer_company_name") val buyeram: String = resSet.getString("buyeram") val buyerbd: String = resSet.getString("buyerbd") val outside_total_amount: String = resSet.getString("outside_total_amount") val po_declaration_remarks: String = resSet.getString("po_declaration_remarks") resMap += po_status -> po_date -> new_sps_no -> buyer_ref_no -> equipment_en -> buyer_company_name -> buyeram -> buyerbd -> outside_total_amount -> po_declaration_remarks } } catch { case e: Exception => println(e.printStackTrace()) } finally { connection.close() } resMap } def originalRfq(new_sps_no : String) ={ var resMap = mutable.ListBuffer[Any]() try { classOf[com.mysql.jdbc.Driver] connection = DriverManager.getConnection(url) val sql = s""" |select new_sps_no, |buyer_ref_no, |rfq_created_time, |equipment_en, |buyer_company_ename, |am_name from data_server.dws_rfq_original |where new_sps_no = "$new_sps_no" """.stripMargin val resSet: ResultSet = connection.createStatement().executeQuery(sql) while(resSet.next()){ val new_sps_no: String = resSet.getString("new_sps_no") val buyer_ref_no: String = resSet.getString("buyer_ref_no") val rfq_created_time: String = resSet.getString("rfq_created_time") val equipment_en: String = resSet.getString("equipment_en") val buyer_company_ename: String = resSet.getString("buyer_company_ename") val am_name: String = resSet.getString("am_name") resMap += new_sps_no -> buyer_ref_no -> rfq_created_time -> equipment_en -> buyer_company_ename -> am_name } } catch { case e: Exception => println(e.printStackTrace()) } finally { connection.close() } resMap } /* 这里有一个表join的操作,要注意的是所有的字段一定要给上别名 不然后面会报错 具体什么错误我忘了 但是给上就一定不会措 */ def SellerPo(new_sps_no : String) ={ var resMap = mutable.ListBuffer[Any]() try { classOf[com.mysql.jdbc.Driver] connection = DriverManager.getConnection(url) val sql = s""" |select |s.new_sps_no as new_sps_no, |equipment_en as new_sps_no, |buyer_ref_no as buyer_ref_no, |buyer_company_name as buyer_company_name, |seller_company_name as seller_company_name, |outside_delivery_time as outside_delivery_time, |outside_total_amount as outside_total_amount, |outside_term_payment_id as outside_term_payment_id, |sellerbd as sellerbd, |selleram as selleram, |commission as commission, |contract_currency as contract_currency, |to_usd_rate as to_usd_rate, |usd_amount as usd_amount, |po_status as po_status, |po_effective_date as po_effective_date, |po_stock_completed_date as po_stock_completed_date, |po_notice_delivery_date as po_notice_delivery_date, |f.form_date as form_date, |f.form_amount as form_amount, |f.form_no as form_no, |f.remarks as form_remarks, |p1.due_date as due_date, |p1.due_amount as due_amount, |p1.actually_date as actually_date, |p1.actually_amount as actually_amount, |p1.remarks as payment_remarks |from dws_seller_po s |left join dws_form f on s.new_sps_no = f.new_sps_no |left join dws_payment p1 on s.new_sps_no = p1.new_sps_no |where dws_seller_po.new_sps_no = "$new_sps_no" """.stripMargin val resSet: ResultSet = connection.createStatement().executeQuery(sql) while(resSet.next()){ val new_sps_no: String = resSet.getString("new_sps_no") val equipment_en: String = resSet.getString("equipment_en") val buyer_ref_no: String = resSet.getString("buyer_ref_no") val buyer_company_name: String = resSet.getString("buyer_company_name") val seller_company_name: String = resSet.getString("seller_company_name") val outside_delivery_time: String = resSet.getString("outside_delivery_time") val outside_total_amount: String = resSet.getString("outside_total_amount") val outside_term_payment_id: String = resSet.getString("outside_term_payment_id") val sellerbd: String = resSet.getString("sellerbd") val selleram: String = resSet.getString("selleram") val commission: String = resSet.getString("commission") val contract_currency: String = resSet.getString("contract_currency") val to_usd_rate: String = resSet.getString("to_usd_rate") val usd_amount: String = resSet.getString("usd_amount") val po_status: String = resSet.getString("po_status") val po_effective_date: String = resSet.getString("po_effective_date") val po_stock_completed_date: String = resSet.getString("po_stock_completed_date") val po_notice_delivery_date: String = resSet.getString("po_notice_delivery_date") val form_date: String = resSet.getString("form_date") val form_amount: String = resSet.getString("form_amount") val form_no: String = resSet.getString("form_no") val form_remarks: String = resSet.getString("form_remarks") val due_date: String = resSet.getString("due_date") val due_amount: String = resSet.getString("due_amount") val actually_date: String = resSet.getString("actually_date") val actually_amount: String = resSet.getString("actually_amount") val payment_remarks: String = resSet.getString("payment_remarks") resMap += new_sps_no -> equipment_en -> buyer_ref_no -> buyer_company_name -> seller_company_name -> outside_delivery_time -> outside_total_amount -> outside_term_payment_id -> sellerbd -> selleram -> commission -> contract_currency -> to_usd_rate -> usd_amount -> po_status -> po_effective_date -> po_stock_completed_date -> po_notice_delivery_date -> form_date -> form_amount -> form_no -> form_remarks -> due_date -> due_amount -> actually_date -> actually_amount -> payment_remarks } } catch { case e: Exception => println(e.printStackTrace()) } finally { connection.close() } resMap } /** * update * maxcomputer表 * max表建表需要添加 tblproperties("transactional"="true"); * 因为我这边开通的hologres是外部加速版 所以不能直接写入holo * 所以测试写进max表 * 例: drop table t5; create table t5(id String,name String,age String) tblproperties("transactional"="true"); insert into t5 values ("1",'wzy',"123"),("2",'css','1125'),('3','pt','125645') * 需要注意数据库数据类型与传参数据类型对应 */ def updateAmSellerPo( id :String, name :String, age :String ) ={ try { classOf[com.aliyun.odps.jdbc.OdpsDriver] MaxConnection = DriverManager.getConnection(MaxUrl,accessId,accessKey) val sql = s""" |update t5 set |name = '$name',age = '$age' |where id = '$id' |""".stripMargin val resSet: Int = MaxConnection.createStatement().executeUpdate(sql) println(resSet) } catch { case e: Exception => println(e.printStackTrace()) } finally { MaxConnection.close() } } /* inser没什么好说的 就是普通的sql 看懂就行 */ def insertAmSellerPo( new_sps_no :String, buyer_company_name :String, buyer_po_no :String, am_name :String, seller_company_name :String, seller_bd_name :String, equipment :String, delivery_lead_time :String, po_status:String, delivery_term :String, po_amount :String, po_currency :String, commission :String, po_exchange_rate :String, po_issue_date :String, po_efftive_date :String, po_notice_delivery_date :String, po_receiving_date :String, receivables_amount_usd:String, received_amount_usd:String, receivables_date:String, due_date:String, po_date:String, payment_remarks:String ) ={ try { classOf[com.aliyun.odps.jdbc.OdpsDriver] MaxConnection = DriverManager.getConnection(url,props) val sql = s""" |insert into dws_seller_po ( |new_sps_no, |buyer_company_name, |buyer_po_no, |am_name, |po_status, |seller_company_name, |seller_bd_name, |equipment, |delivery_lead_time, |delivery_term, |po_amount, |po_currency, |commission, |po_exchange_rate, |po_issue_date, |po_efftive_date, |po_notice_delivery_date, |po_receiving_date, |receivables_amount_usd, |received_amount_usd, |receivables_date, |due_date, |po_date, |payment_remarks) |values ( |'$new_sps_no', |'$buyer_company_name', |'$buyer_po_no', |'$am_name', |'$po_status', |'$seller_company_name', |'$seller_bd_name', |'$equipment', |'$delivery_lead_time', |'$delivery_term', |'$po_amount', |'$po_currency', |'$commission', |'$po_exchange_rate', |'$po_issue_date', |'$po_efftive_date', |'$po_notice_delivery_date', |'$po_receiving_date', |'$receivables_amount_usd', |'$received_amount_usd', |'$receivables_date', |'$due_date', |'$payment_remarks', |'$po_date') |""".stripMargin val resSet: Int = MaxConnection.createStatement().executeUpdate(sql) println(resSet) } catch { case e: Exception => println(e.printStackTrace()) } finally { MaxConnection.close() } } }
需要注意的点全部都写在代码里面了 喜欢一键copy的朋友也不用扒着网页看。
继续直接上代码
import java.net.URLDecoder import akka.actor.ActorSystem import akka.http.javadsl.server.RouteResults import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import akka.http.scaladsl.server.{Route, RouteResult} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.headers.HttpOriginRange import ch.megard.akka.http.cors.scaladsl.CorsDirectives.cors import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings import org.json4s.DefaultFormats import org.json4s.jackson.Json import spray.json.DefaultJsonProtocol._ import scala.io.StdIn import scala.collection.mutable object Server { val settings = CorsSettings.defaultSettings.copy(allowedOrigins = HttpOriginRange.*) final case class AmList( po_status :String, po_date :String, new_sps_no :String, buyer_ref_no :String, equipment_en :String, buyer_company_name :String, buyeram :String, buyerbd :String, outside_total_amount :String, po_declaration_remarks:String) implicit val itemFormat = jsonFormat10(AmList) final case class UserGroup(items: List[AmList]) implicit val orderFormat = jsonFormat1(UserGroup) private val userGroup = mutable.ListBuffer[AmList]() final case class Info(seller_po_id:String, new_sps_no:String, equipment_en:String, buyer_ref_no:String, buyer_company_name:String, seller_company_name:String, outside_delivery_time:String, outside_total_amount:String, outside_delivery_amount:String, outside_term_payment_id:String, sellerbd:String, selleram:String, commission:String, to_usd_rate:String, usd_amount:String, po_status:String, po_date:String) implicit val itemFormat17 = jsonFormat17(Info) def main(args: Array[String]): Unit = { implicit val system = ActorSystem("sps_system") implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher /*这边一个大坑就是~这个东西 当时也是没自己看原作的代码 删了一堆没仔细看 第一个接口跑通了 后面总是有问题 后来仔细对照了才发现漏掉了~ 其余没什么问题 直接照抄就行 */ val Find: Route = (path("hello") & get & cors(settings)) { // 基本测试 complete("hello akka") } ~ (path("AmPage") & cors(settings)) { get { parameters('po_status.as[String], 'new_sps_no.as[String], 'pageSize.as[Int]) { (po_status, new_sps_no,pageSize) => { val res = Json(DefaultFormats).write(MysqlUtil.AmPage(po_status, new_sps_no,pageSize)) complete(res) } } } } ~ (path("originalRfq") & cors(settings)) { get { parameters('new_sps_no.as[String]) { new_sps_no => { val res2 = Json(DefaultFormats).write(MysqlUtil.originalRfq(new_sps_no)) complete(res2) } } } } ~ post { (path("insertAmSellerPo") & cors(settings)) { entity(as[Info]) { info => { val res = MysqlUtil.insertAmSellerPo( info.seller_po_id, info.new_sps_no, info.equipment_en, info.buyer_ref_no, info.buyer_company_name, info.seller_company_name, info.outside_delivery_time, info.outside_total_amount, info.outside_delivery_amount, info.outside_term_payment_id, info.sellerbd, info.selleram, info.commission, info.to_usd_rate, info.usd_amount, info.po_status, info.po_date) complete("done") } } } }~ post { (path("update") & cors(settings)) { entity(as[Info2]) { Info2 => { val res = MysqlUtil.updateAmSellerPo( Info2.id, Info2.name, Info2.age) complete("done") } } } } // 绑定ip和端口 val bindingFuture = Http().bindAndHandle(Find, "localhost", 9881) println(s"Server online at http://localhost:9881/\nPress RETURN to stop...") StdIn.readLine() bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate()) } }
其实到这边就可以结束了 原作还有一个客户端操作层 server直接启动后
进postman一样操作的 具体操作注意一下代码里面的post和get对应就行
最后是pom依赖
其实这个问题属实搞得我头大,最后也是用了下面的才成功
废话不多说直接贴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>com.scala-mysql.shipparts</artifactId> <version>1.0-SNAPSHOT</version> <name>com.scala-mysql.shipparts</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <!-- Spark Core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.4</version> <scope>provided</scope> </dependency> <!-- Mysql connector --> <!-- https://mvnrepository.com/artifact/com.aliyun.odps/odps-sdk-core --> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>0.36.4-public</version> </dependency> <!-- https://mvnrepository.com/artifact/com.aliyun.odps/odps-spark-datasource --> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-spark-datasource_2.11</artifactId> <version>3.3.2-public</version> </dependency> <!-- https://mvnrepository.com/artifact/com.aliyun.odps/odps-spark-client --> <!-- https://mvnrepository.com/artifact/com.aliyun.odps/cupid-spark-2.x --> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-jdbc</artifactId> <version>3.0.1</version> <classifier>jar-with-dependencies</classifier> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.18.jre6</version> </dependency> <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>postgresql-holo</artifactId> <version>42.2.18.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>14.0.1</version> </dependency> <!-- <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </dependency>--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <!-- Restful Api --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_2.11</artifactId> <version>10.1.9</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.11</artifactId> <version>2.5.23</version> </dependency> <dependency> <groupId>ch.megard</groupId> <artifactId>akka-http-cors_2.11</artifactId> <version>0.2.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http-spray-json_2.11</artifactId> <version>10.1.9</version> </dependency> <dependency> <groupId>org.scalaj</groupId> <artifactId>scalaj-http_2.11</artifactId> <version>2.4.1</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-codec/commons-codec --> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.15</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-shade-plugin --> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.crawler.http.Main</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
最后一段
<executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.crawler.http.Main</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> </configuration> </execution> </executions>
这个
如果是要用scala打jar包就一定要带上
还有这个
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
<version>5.1.5-jhyde</version>
<scope>test</scope>
</dependency>
这串代码也是研究好了很久 有jar包都没有用 alt键点了还是没有反应,
最后是把这个jar包上传到了自己的maven私有云上面
就是maven的那个setting.xml 带上自己的maven镜像 就可以了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。