Spark-Mongo-Connector

官方文档

https://docs.mongodb.com/spark-connector/current/

sbt 地址

libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector_2.11" % "2.0.0"

写配置

在SparkConf或者$SPARK_HOME/conf/spark-default.conf 文件中将uri、database、collection必选的配置项配置好,如下

package com.neusoft.apps

import com.mongodb.spark.{MongoConnector, MongoSpark}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import com.mongodb.spark.config._

// mongodb集群的写法,单个mongodb只写一个即可
val uri = """mongodb://xxx.xxx.xxx.xxx:27017/db.test,xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx.xxx:27017"""

val conf = new SparkConf()
      .set("spark.mongodb.output.uri", uri)) // 写配置

val sparkSession = SparkSession.builder().config(conf).appName("learn something").getOrCreate()

当后面要读写别的collection时使用WriteConfig覆盖即可

val writeVectorMap = new HashMap[String, String]
writeVectorMap += ("collection" -> CollectionDict.VISIT_VECTOR)
writeVectorMap += ("writeConcern.w" -> "majority")

val writeVectorConfig = WriteConfig(writeVectorMap, Some(WriteConfig(sparkSession)))

MongoSpark.save(similarityDocRDD, writeVectorConfig)

读配置

package com.neusoft.apps

import com.mongodb.spark.{MongoConnector, MongoSpark}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import com.mongodb.spark.config._

// mongodb集群的写法,单个mongodb只写一个即可
val uri = """mongodb://xxx.xxx.xxx.xxx:27017/db.test,xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx.xxx:27017"""

val conf = new SparkConf()
      .set("spark.mongodb.input.uri", uri) // 读配置

val sparkSession = SparkSession.builder().config(conf).appName("learn something").getOrCreate()

当后面要读写别的collection时使用ReadConfig覆盖即可

 val readVectorMap = new HashMap[String, String]
 readVectorMap += ("collection" -> CollectionDict.VISIT_REAL_DATA)
 readVectorMap += ("readPreference.name" -> "secondaryPreferred")
 // mongodb版本过低(3.4以下)需要指定partitioner,用于shard集群
 // readVectorMap += ("partitioner" -> "MongoShardedPartitioner")
 // MongoSplitVectorPartitioner用于standalone和replication set集群,且3.4之后此属性默认使用MongoDefaultPartitioner,它是所有平台通用的分片类MongoSamplePartitioner的包装类
 // 另外,如果在replicationset部署方式下使用MongoShardedPartitioner会出现无法将数据分片的问题,导致只有一个Task
 readVectorMap += ("partitioner" -> "MongoSplitVectorPartitioner")
 val readVectorConfig = ReadConfig(readVectorMap, Some(ReadConfig(sparkSession)))
 val visitData = MongoSpark.load(sparkSession.sparkContext, readVectorConfig)

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦