华硕a43s(华硕a43s加装固态硬盘教程)速看
启动之后,增加 Storage 服务:注:增加 Storage 服务为 v3.x 版本以上所需操作,如果你使用的是 v2.x 可忽略本步骤。
本文首发于Nebula Graph Community 公众号一、Nebula Spark Connector 概念、适用场景、优势这里不做赘述,仅截图展示,更多详情参考文档:https://docs.nebula-graph.com.cn/nebula-spark-connector/ 2。
二、环境信息硬件环境名称值推荐本地磁盘 SSD2 T至少 2 TCPU16 C * 4128 C内存128 GB128 G软件环境名称版本号Nebula Graph3.0.0Nebula Spark Connector
3.0.0Hadoop2.7.2U17-10Spark2.4.5U5数据量级名称值数据量200 G实体 Vertext9.3 亿关系 Edge9.7 亿三、部署方案部署方式:分布式,3 个节点参考官网即可:https://docs.nebula-graph.com.cn/3.0.1/4.deployment-and-installation/2.compile-and-install-nebula-graph/deploy-nebula-graph-cluster/ 1
大体也就三部曲:下载内核 RPM 包并安装;批量修改配置文件;启动集群服务以下操作使用的 root,非 root 就加个 sudo 执行即可下载 Nebula Graph RPM 包并安装执行下面命令:。
wget https://os-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm wget https://oss-cdn.nebula-graph.com.cn/package
/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm.sha256sum.txt rpm -ivh nebula-graph-3.0.0.el7.x86_64.rpm注:默认安装路径:
/usr/local/nebula/,务必保证所在磁盘空间充足批量修改配置文件sed -i s?--meta_server_addrs=127.0.0.1:9559?--meta_server_addrs=172.16.8.15:9559,172.16.8.176:9559,172.16.10.149:9559?g。
*.conf sed -i s?--local_ip=127.0.0.1?--local_ip=172.16.10.149?g*.conf sed -i s?--meta_server_addrs=127.0.0.1:9559?--meta_server_addrs=172.16.8.15:9559,172.16.8.176:9559,172.16.10.149:9559?g
*.conf sed -i s?--local_ip=127.0.0.1?--local_ip=172.16.8.15?g*.conf sed -i s?--meta_server_addrs=127.0.0.1:9559?--meta_server_addrs=172.16.8.15:9559,172.16.8.176:9559,172.16.10.149:9559?g
*.conf sed -i s?--local_ip=127.0.0.1?--local_ip=172.16.8.176?g*.conf注:ip 地址是内网地址,用来集群间通信启动之后,增加 Storage 服务:。
ADDHOSTS 172.x.x.15:9779,172.1x.x.176:9779,172.x.1x.149:9779;注:增加 Storage 服务为 v3.x 版本以上所需操作,如果你使用的是 v2.x 可忽略本步骤。
启动集群服务/usr/local/nebula/scripts/nebula.service startall上述命令启动服务,执行下面命令检查服务是否启动成功:ps aux|grep nebula结果如下 3 个服务进程:
/usr/local/nebula/bin/nebula-metad --flagfile /usr/local/nebula/etc/nebula-metad.conf /usr/local/nebula/bin/nebula-graphd --flagfile /usr/
local/nebula/etc/nebula-graphd.conf /usr/local/nebula/bin/nebula-storaged --flagfile /usr/local/nebula/etc/nebula-storaged.conf
注:如果少于 3 个,就多执行几次 /usr/local/nebula/scripts/nebula.service start all,再不行就 restart三、可视化服务我选择的是 Nebula Graph Studio,访问:http://n01v:7001 即可使用 Studio(注:这里是我自己的网络环境,读者不可访问)。
登录:10.x.x.1(任意节点):9669用户名/密码:root/nebula这里可以阅读下官方文档的常用 nGQL 命令:https://docs.nebula-graph.com.cn/3.0.1/2.quick-start/4.nebula-graph-crud
开始使用 Nebula Graph注册 Nebula 集群:ADDHOSTS 172.x.x.121:9779, 172.16.11.218:9779,172.16.12.12:9779;列出所有节点,查看 STATUS 列是否为 ONLINE,可通过
SHOW HOSTS; 或 SHOW HOSTS META;创建 Space,等价于传统数据库 database:CREATESPACE mylove (partition_num = 15, replica_factor = 。
3, vid_type = FIXED_STRING(256));//分区数推荐为节点数的5倍关系,副本数为基数,一般设置为3,vid如果为string类型,长度尽量够用就行,否则占用磁盘空间太多创建 Tag,等价于实体 Vertex:。
CREATE TAG entity (namestringNULL, versionstringNULL);创建 Edge,等价于关系 Edge:CREATE EDGE relation (namestring
NULL);查询时,务必添加 LIMIT,否则容易查死库:match (v) return v limit 100;四、(本文重点)使用 Spark Connector 读取 CSV 及入库这里可以参考 2 份资料:
官方的 NebulaSparkWriterExample(scala-json 格式):https://github.com/vesoft-inc/nebula-spark-utils/blob/master/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala 4
大神提供的 NebulaSparkWriterExample(java-json格式):https://www.jianshu.com/p/930e0343a28c 3附上 NebulaSparkWriterExample 的示例代码:
import com.facebook.thrift.protocol.TCompactProtocol import com.vesoft.nebula.connector.{ NebulaConnectionConfig, WriteMode, WriteNebulaEdgeConfig, WriteNebulaVertexConfig }
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory
object NebulaSparkWriter { privateval LOG = LoggerFactory.getLogger(this.getClass) var ip = "" def main(args: Array[String]):
Unit = { val part = args(0) ip = args(1) val sparkConf = new SparkConf sparkConf .
set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession .builder() .master("local") .config(sparkConf) .getOrCreate() spark.sparkContext.setLogLevel(
"WARN") if("1".equalsIgnoreCase(part)) writeVertex(spark) if("2".equalsIgnoreCase(part)) writeEdge(spark) spark.close() } def getNebulaConnectionConfig(): NebulaConnectionConfig = {
val config = NebulaConnectionConfig .builder() .withMetaAddress(ip + ":9559") .withGraphAddress(ip +
":9669") .withTimeout(Integer.MAX_VALUE) .withConenctionRetry(5) .build() config } def writeVertex(spark: SparkSession):
Unit = { LOG.info("start to write nebula vertices: 1 entity") val df = spark.read.option("sep"
, "\t").csv("/home/2022/project/origin_file/csv/tag/entity/").toDF("id", "name", "version") val config = getNebulaConnectionConfig()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig .builder() .withSpace(
"mywtt") .withTag("entity") .withVidField("id") .withVidAsProp(false) .withUser(
"root") .withPasswd("nebula") .withBatch(1800) .build() df.coalesce(1400).write.nebula(config, nebulaWriteVertexConfig).writeVertices() } def writeEdge(spark: SparkSession):
Unit = { LOG.info("start to write nebula edges: 2 entityRel") val df = spark.read.option("sep"
, "\t").csv("/home/2022/project/origin_file/csv/out/rel/relation/").toDF("src", "dst", "name") val
config = getNebulaConnectionConfig() val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig .builder() .withSpace(
"mywtt") .withEdge("relation") .withSrcIdField("src") .withDstIdField("dst") .withSrcAsProperty(
false) .withDstAsProperty(false) .withUser("root") .withPasswd("nebula") .withBatch(
1800) .build() df.coalesce(1400).write.nebula(config, nebulaWriteEdgeConfig).writeEdges() } }
重点详解 NebulaSparkWriterExample 示例代码这里讲解一些函数项:spark.sparkContext.setLogLevel("WARN"):设置日志打印级别,防止 INFO 干扰;
withTimeout(Integer.MAX_VALUE):连接超时时间尽量大一些,默认为 1 分钟,超时次数大于重试次数后,Spark 任务就失败了;option("sep", "\t"):指定 CSV 文件的分隔符,否则就默认为 1 列了;
toDF("src", "dst", "name"):数据集指定 Schema,即 Dataset 转 DataFrame,否则就不能指定 VidField 了;withVidField("id")
:因为该函数只支持设置列名称,所以必须定义 Schema;withVidAsProp(false):默认 ID 为 VID 字段,数据就不用重复存储为属性了,占用磁盘空间;withSrcIdField("src")
:设置起始节点的 IdField;withDstIdField("dst"):设置终止节点的 IdField;withSrcAsProperty(false):节省空间withDstAsProperty(false)
:节省空间withBatch(1000):批量大小,WriteMode.UPDATE 默认 <=512,WriteMode.INSERT 可以设置大一些(千兆网卡/带宽 5Gbps /本地 SSD = 1500)
coalesce(1500):可根据任务并发数调节单个 partition 数据量过大,容易导致 executor OOM;五、提交任务到 Spark 集群nohup spark-submit --master yarn --deploy-mode client --。
classcom.xxx.nebula.connector.NebulaSparkWriter --confspark.dynamicAllocation.enabled=false --conf spark.executor.memoryOverhead=
10g --conf spark.blacklist.enabled=false --conf spark.default.parallelism=1000 --driver-memory 10G --executor-memory
12G --executor-cores 4 --num-executors 180 ./example-3.0-SNAPSHOT.jar > run-csv-nebula.log 2>&1 &辅助监控 iotop 命令
Total DISK READ :26.61K/s|Total DISK WRITE :383.77M/sActual DISK READ:26.61K/s|Actual DISK WRITE:431.75
M/s辅助监控 top 命令top-16:03:01up8days,28min,1user,load average:6.16,6.53,4.58Tasks:205total,1running,204sleeping,
0stopped,0zombie%Cpu(s):28.3us,14.2sy,0.0ni,56.0id,0.6wa,0.0hi,0.4si,0.5stKiB Mem :13186284+total,1135004
free,31321240used,99406592buff/cacheKiB Swap:0total,0free,0used.99641296availMemPIDUSERPRNIVIRTRESSHR
S%CPU%MEMTIME+COMMAND27979root20039.071g0.026t9936S564.620.883:22.03nebula-storaged27920root2002187476
8040367672S128.20.617:13.75nebula-graphd27875root20064846441.990g8588S58.51.614:14.22nebula-metad其他资源监控
服务优化nebula-storaged.conf 配置优化这里我修改了 nebula-storaged.conf 配置项:# 一个批处理操作的默认保留字节--rocksdb_batch_size=4096
# BlockBasedTable中使用的默认块缓存大小# 单位为 MB. 服务器内存128G,一般设置为三分之一--rocksdb_block_cache=44024############## rocksdb Options ##############
--rocksdb_disable_wal=true# rocksdb DBOptions在json中,每个option的名称和值都是一个字符串,如:“option_name”:“option_value”,逗号分隔
--rocksdb_db_options={"max_subcompactions":"3","max_background_jobs":"3"} # rocksdb ColumnFamilyOptions在json中,每个option的名称和值都是字符串,如:“option_name”:“option_value”,逗号分隔
--rocksdb_column_family_options={"disable_auto_compactions":"false","write_buffer_size":"67108864","max_write_buffer_number"
:"4","max_bytes_for_level_base":"268435456"} # rocksdb BlockBasedTableOptions在json中,每个选项的名称和值都是字符串,如:“option_name”:“option_value”,逗号分隔
--rocksdb_block_based_table_options={"block_size":"8192"} # 每个请求最大的处理器数量--max_handlers_per_req=10# 集群间心跳间隔时间
--heartbeat_interval_secs=10--raft_rpc_timeout_ms=5000--raft_heartbeat_interval_secs=10--wal_ttl=14400
# 批量大小最大值--max_batch_size=1800# 参数配置减小内存应用--enable_partitioned_index_filter=true# 数据在最底层存储层间接做了过滤,生产环境防止遇到查到超级节点的困扰
--max_edge_returned_per_vertex=10000Linux 系统优化ulimit-c unlimitedulimit-n 130000sysctl-w net.ipv4.tcp_slow_start_after_idle=0
sysctl-w net.core.somaxconn=2048sysctl-w net.ipv4.tcp_max_syn_backlog=2048sysctl-w net.core.netdev_max_backlog=3000
sysctl-w kernel.core_uses_pid=1六、验证导入结果SUBMIT JOB STATS; SHOW JOB ${ID} SHOW STATS;实体插入速率大约 27,837 条/s
(仅适用本次导入性能计算)关系插入速率大约 26,276 条/s (仅适用本次导入性能计算)如果服务器配置更好,性能会更好;另外带宽、是否跨数据中心、磁盘 IO 也是影响性能因素,甚至是网络波动等[root@node02
nebula]#df-hFilesystemSizeUsedAvailUse%Mountedon/dev/sda150G2.2G48G5%//dev/sdb12.0T283G1.6T16%/usr/local/nebula
tmpfs13G013G0%/run/user/62056七、性能测试根据属性查询指定节点:MATCH (v:entity) WHERE v.entity.name == Lifespan RETURN v;
执行时间消耗 0.002558 (s)一跳MATCH (v1:entity)-[e:propertiesRel]->(v2:attribute) WHERE id(v1) == 70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a
RETURN v2 limit 100;执行时间消耗 0.003571 (s)两跳MATCH p=(v1:entity)-[e:propertiesRel*1..2]->(v2) WHERE id(v1
) == 70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a RETURN p;执行时间消耗 0.005143 (s)获取边的所有属性值
FETCH PROP ON propertiesRel 70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a -> 0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256
YIELD properties(edge);执行时间消耗 0.001304 (s)match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1]->(v2)
return p;执行时间消耗 0.02986 (s)match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*2]->(v2) return p;
执行时间消耗 执行时间消耗 0.07937 (s)match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*3]->(v2) return p;执行时间消耗 0.269 (s)
match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*4]->(v2) return p;执行时间消耗 3.524859 (s)match p=(v:entity{name:
"张三"})-[e:entityRel|propertiesRel*1..2]->(v2) return p;执行时间消耗 0.072367 (s)match p=(v:entity{name:"张三"
})-[e:entityRel|propertiesRel*1..3]->(v2) return p;执行时间消耗 0.279011 (s)match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*
1..4]->(v2) return p;执行时间消耗 3.728018 (s)查询点A_vid到点B_vid的最短路径(双向),携带点和边的属性:FIND SHORTEST PATH WITH PROP
FROM"70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a"TO"0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256"
OVER * BIDIRECT YIELD pathAS p;执行时间消耗 0.003096 (s)FIND ALL PATH FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a"
TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * WHERE propertiesRel.name is not
EMPTYor propertiesRel.name >=0YIELD path AS p;执行时间消耗 0.003656 (s)八、遇到的问题:1.guava 依赖包版本冲突问题Caused by: java.lang.NoSuchMethodError: com.google.common.
base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;经排查发现依赖的一个模块使用 guava 版本 22.0,而 Spark 集群自带 14.0,导致冲突,而无法正常工作。
运行在 Spark 集群上的任务,Spark 加载 guava 包优先级高于自己的包我们依赖的包使用到 guava 版本 22.0 中比较新的方法,而在 14.0 版本还没有这样的方法在不能修改对方代码的前提下,有如下方案:。
spark 集群的包升级一下,风险较高,容易造成未知问题另外一种方式是利用 Maven 插件重命名自己的 guava 包这里采用了第二种方式,利用 Maven 插件 shade(链接:https://maven.apache.org/plugins/maven-shade-plugin/ 1)重命名包解决问题。
org.apache.maven.pluginsmaven-shade-plugin
>3.2.4packageshade
>com.google.commonmy_guava.common
>*:*META-INF/maven/**
META-INF/*.SFMETA-INF/*.DSAMETA-INF/*.RSA
>2.Spark 黑名单机制问题Blacklisting
behavior can be configured via spark.blacklist.*.spark.blacklist.enabled,默认值 false如果这个参数这为 true,那么 Spark 将不再会往黑名单里面的执行器调度任务。
黑名单算法可以由其他 spark.blacklist 配置选项进一步控制,详情参见下面的介绍欢迎到 Nebula 论坛与作者讨论交流:https://discuss.nebula-graph.com.cn。
- 标签:
- 编辑:李松一
- 相关文章
-
DELL 保修(dell 保修期内 键盘回弹不灵)全程干货
前面说了鼠标会出现的故障,这回咱们说说键盘可能会出现故障的原因,一起来看看吧~在使用键盘的过程中,故障有多种表现形式,同时故障原…
-
华擎主板驱动(华擎主板驱动盘上面的英文字母都是什么)居然可以这样
2月12日下周一,AMD将发布全新的桌面款RyzenAPU。基于14nm工艺和Zen+Vega架构,而在4月份,我们将迎来第二代RyzenCPU…
- 联想官网下载(联想官网下载中心)全程干货
- hd7000(HD7000系列)深度揭秘
- 成都华为(成都华为公司待遇怎样)万万没想到
- s420(s420gd是什么材质)墙裂推荐
- 笔记本性价比(戴尔笔记本性价比)这都可以