【注:由于 farcaster 的代码库非常活跃,若本文内容和你看到的代码之间有出入,请以代码为准。】
近期对 web3 当红项目 farcaster 代码进行了粗略的研究,重点关注了其 decentralized node 实现 hub。并且基于学到的知识以 quick and dirty 的方式实现了一个简单的原型,本文则是这段时间内学习成果的汇总。
之所以研究它,原因无它:正在构思一个 decentralized node 实现,而 farcaster 的技术栈与我们团队高度吻合。
如果作为 web3 从业者还不清楚 farcaster 为何物,那么建议充分利用搜索引擎或直接问近期崛起的 AI 服务。要是用一句话介绍的话:farcaster 就是 web3 版的 twitter。
它的整体架构如下图:
[来源:https://docs.farcaster.xyz/assets/architecture.T7tCPEnC.png]
由上图可知,hub 组成了其去中心化的网络,它也正是本文的重点。至于其他内容,在此略过不谈。
Hub 的架构如下:
[来源:https://docs.farcaster.xyz/assets/hub.PU02ORHT.png]
作为一个去中心化节点,hub 的职责:
最后请注意:hub node 并非一个 blockchain 节点,它只是一个 offchain 的去中心化节点。
hub 是一个多线程 node 应用,充分利用了 worker 技术来划分节点的不同职责:p2p node、db、merkel 同步 ……
同时还利用 ReadWriteLock 来保证线程安全。
不过,farcaster 使用的 node 自带的 worker 库,代码不免有些繁琐。在此,我推荐使用 threads.js ,它可极大简化代码编写,其 github readme 中的例子充分体现了这一点。其典型特性:
parentPort
与父线程通信。termination
、 message
……熟悉 node worker 编程的小伙伴可能会说还有: piscina 。阔是经过人肉测试之后,个人更喜欢 threads.js。
farcaster 采用 rocksdb 来作为其节点数据库,这也是很多 blockchain node 的选择,因为其读写速度飞快。
阔是,rocksdb 在 node 生态实在更新太慢,各位可自行去搜搜它的最近一次发布是什么时候。更要命的是,其版本混乱,甚至于 rocksdb npm 的 github repo 都是在其老祖宗 leveldb github repo 之下 😓。
farcaster 后来 fork 了这个库,可是并未添加什么实质性更新,仅仅做了一些关于平台编译的微调,列位看官可点击此处自行查看。
甚至于 eth 的 node 实现 lodestar 也用的是 leveldb,而不是 rocksdb。
估计 farcaster 团队也被此弄烦了,再加上内存和性能的问题,在近期代码中,rocksdb 相关代码已经采用组合技术实现:node rust bridge + rust rocksdb cargo。在完成迁移之后,还在 twitter 上欣喜的说:内存少了,性能高了!
关于 node 和 rust addon 的集成,由于时间和精力关系,我没有深入研究。目前初步看来至少有两种流行方案:
hub 采用 grpc 进行两方面的通信:
并通过 grpc-web
来提供 http 接口。
hub 使用 ts-proto 来完成 protobuf 的代码生成,直接使用 grpc
和 grpc-web
来用于 server 端实现,至于 http server 则是用 fastify 来完成。
跟直接使用 worker 库一样,这种方式导致代码繁琐,建议采用 connectrpc 快速实现。
libp2p 是去中心化节点实现的基础,不少 blockchain 实现也是基于它来完成,比如 eth 的 node 实现 lodestar。
但它本身是一组大的协议栈,若对于其组成部分缺乏基本了解,则无法用好,建议先阅读协议再看配置。
hub 使用 gossipsub 实现节点间的消息传播,并利用 connectionGater
来实现对于节点连接的控制。
但可能是代码实现时间较早的缘故,根据现有协议规范,若自行实现时,可以考虑若干增强:
基本代码架子如下:
createLibp2p({
peerId: await createFromPrivKey(
await unmarshalPrivateKey(Buffer.from(peerPrivateKey, 'hex'))
),
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0'],
},
connectionGater: {
denyDialPeer: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyDialMultiaddr: async (multiaddr: Multiaddr) => {
return await notAllowed(multiaddr.getPeerId()!.toString());
},
denyInboundConnection: () => {
return false;
},
denyOutboundConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundEncryptedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundEncryptedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundUpgradedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundUpgradedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundRelayReservation: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundRelayedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundRelayedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
},
connectionManager: {
autoDialInterval: 1000,
},
transports: [tcp()],
connectionEncryption: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [
mdns({
interval: 1000,
}),
pubsubPeerDiscovery(),
],
services: {
pubsub: gossipsub(),
identify: identify(),
},
})
.then(async node => {
libp2p = node;
node.addEventListener('peer:discovery', async evt => {
console.log('Discovered %s', evt.detail.id.toString());
});
node.addEventListener('peer:connect', async evt => {
console.log('Connected to %s', evt.detail.toString());
checkPeersPermissionPeriodically();
});
node.services.pubsub.addEventListener('message', message => {
if (message.detail.topic === messageTopic) {
parentPort?.postMessage({
topic: message.detail.topic,
message: message.detail.data.toString(),
});
}
});
node.services.pubsub.addEventListener('gossipsub:message', message => {});
node.services.pubsub.subscribe(messageTopic);
await node.start();
console.log('listening on addresses:');
node.getMultiaddrs().forEach(addr => {
console.log(addr.toString());
});
const stop = async () => {
await node.stop();
console.log('libp2p has stopped');
process.exit(0);
};
process.on('SIGTERM', stop);
process.on('SIGINT', stop);
})
.catch((e: any) => console.error);
}
CRDT 全称:Conflict Free Replicated Data Types,广泛用于主主复制、文档协同编辑等场景。其特点是:最终一致性,此术语各位应该不会太陌生。
hub 采用 CRDT 来定义其 storage,采用的是 2p-set,看起来吓人其实就是:
在研究代码的过程中,发现一个非常不错的 CRDT 系列文章,第一篇的链接:https://www.bartoszsypytkowski.com/the-state-of-a-state-based-crdts/ ,有兴趣的可以去仔细看看。同时,也可以研究一下大名鼎鼎的yjs。
hub 使用该项技术来实现节点之间的数据同步,即在保存本地消息数据时同步生成 merkel trie,在同步周期到了之后,节点交换各自的 merkel tries,以看是否有交换消息的必要。
此处让我不解的是为何 farcaster 要自行去实现一个 merkel trie,或许在当初 node 生态中尚未如现在一样有众多的 merkel tree 的实现。
如果不考虑精细的控制和性能,一种快糙猛的 merkel 同步实现方式如下:
至于 merkel proof 的生成,采用成熟的 npm 即可,如 merkeltree.js。此外,openzepplin 也提供了一个实现,而且提供了安全性增强,各位可根据自己的喜好自行选择。
如众多开源项目一样,farcaster 采用的是 monorepo 实践,基于 turbo。并且广泛的采用了基于 rust 的 js/ts 工具链:
关于 tooling,请参见我们近期的研究。
如果想了解 decentralized app 的编写套路,farcaster 无疑是一个非常好的参考,本文列出了其中涉及的关键技术,相信此文将会激发各位的思路以及自行探索的兴趣。
觉得有帮助的话,不妨考虑购买付费文章来支持我们 🙂 :
付费文章