golang 源碼分析:cayley-7-

        接着我們看下 writer 的實現,writer 的核心源碼位於 writer/single.go,writer 的註冊方式和存儲的註冊類似,它註冊了一個 single 的 writer

func init() {
  graph.RegisterWriter("single", NewSingleReplication)
type Single struct {
  qs         graph.QuadStore
  ignoreOpts graph.IgnoreOpts
}
func NewSingle(qs graph.QuadStore, opts graph.IgnoreOpts) (graph.QuadWriter, error) {
func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {

得到 writer 後就可以往裏面添加四元祖,它內部是調用了 quad 的

ApplyDeltas

  func (s *Single) AddQuad(q quad.Quad) error {
      return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
func (s *Single) AddQuadSet(set []quad.Quad) error {
      tx := graph.NewTransactionN(len(set))
      return s.qs.ApplyDeltas(tx.Deltas, s.ignoreOpts)

刪除過程類似

func (s *Single) RemoveQuad(q quad.Quad) error {
func (s *Single) RemoveNode(v quad.Value) error {
        for _, d := range []quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} {
    r := graph.NewResultReader(s.qs, s.qs.QuadIterator(d, gv))
    n, err := quad.Copy(del, r)
func (s *Single) ApplyTransaction(t *graph.Transaction) error {
  return s.qs.ApplyDeltas(t.Deltas, s.ignoreOpts)

graph/quadwriter.go 主要包含了插入和刪除兩個操作

type Procedure int8
const (
  Add    Procedure = +1
  Delete Procedure = -1
)

一個增量操作包含四元祖和具體動作兩個部分

type Delta struct {
  Quad   quad.Quad
  Action Procedure
}
func Unwrap(qs QuadStore) QuadStore {

我們創建一個圖數據庫 *Handle 的時候,就是初始化了 QuadStore 和 Writer 兩個屬性

type Handle struct {
  QuadStore
  QuadWriter
}
type IgnoreOpts struct {
  IgnoreDup, IgnoreMissing bool
}

writer 約定了一個接口,具體如下:

type QuadWriter interface {
  // AddQuad adds a quad to the store.
  AddQuad(quad.Quad) error
  // TODO(barakmich): Deprecate in favor of transaction.
  // AddQuadSet adds a set of quads to the store, atomically if possible.
  AddQuadSet([]quad.Quad) error
  // RemoveQuad removes a quad matching the given one  from the database,
  // if it exists. Does nothing otherwise.
  RemoveQuad(quad.Quad) error
  // ApplyTransaction applies a set of quad changes.
  ApplyTransaction(*Transaction) error
  // RemoveNode removes all quads which have the given node as subject, predicate, object, or label.
  //
  // It returns ErrNodeNotExists if node is missing.
  RemoveNode(quad.Value) error
  // Close cleans up replication and closes the writing aspect of the database.
  Close() error
}
type NewQuadWriterFunc func(QuadStore, Options) (QuadWriter, error)
var writerRegistry = make(map[string]NewQuadWriterFunc)

註冊函數定義如下:

func RegisterWriter(name string, newFunc NewQuadWriterFunc) {
  if _, found := writerRegistry[name]; found {
    panic("already registered QuadWriter " + name)
  }
  writerRegistry[name] = newFunc
}
func NewQuadWriter(name string, qs QuadStore, opts Options) (QuadWriter, error) {
  newFunc, hasNew := writerRegistry[name]
  if !hasNew {
    return nil, errors.New("replication: name '" + name + "' is not registered")
  }
  return newFunc(qs, opts)
}
func WriterMethods() []string {
type BatchWriter interface {
  quad.WriteCloser
  Flush() error
}
func NewWriter(qs QuadWriter) BatchWriter {
type batchWriter struct {
  qs  QuadWriter
  buf []quad.Quad
}
func (w *batchWriter) flushBuffer(force bool) error {
func (w *batchWriter) WriteQuad(q quad.Quad) error {
func NewTxWriter(tx *Transaction, p Procedure) quad.Writer {
type txWriter struct {
  tx *Transaction
  p  Procedure
}
func NewRemover(qs QuadWriter) BatchWriter {
type removeWriter struct {
  qs QuadWriter
}
func (w *removeWriter) WriteQuad(q quad.Quad) error {
  return w.qs.RemoveQuad(q)
func NewQuadStoreReader(qs QuadStore) quad.ReadSkipCloser {
func NewResultReader(qs QuadStore, it Iterator) quad.ReadSkipCloser {

和 Writer 對應,Reader 包含了 QuadStore 和迭代器

type quadReader struct {
  qs QuadStore
  it Iterator
}
func (r *quadReader) ReadQuad() (quad.Quad, error) {
  if r.it.Next(context.TODO()) {
    return r.qs.Quad(r.it.Result()), nil

imports.go

var (
  StartMorphism = path.StartMorphism
  StartPath     = path.StartPath
  NewTransaction = graph.NewTransaction
)

筆記哦核心的幾個變量類型

type Iterator = graph.Iterator
type QuadStore = graph.QuadStore
type QuadWriter = graph.QuadWriter
type Path = path.Path
type Handle struct {
  graph.QuadStore
  graph.QuadWriter
}

三元祖其實就是四元祖去掉 label

func Triple(subject, predicate, object interface{}) quad.Quad {
  return Quad(subject, predicate, object, nil)
func Quad(subject, predicate, object, label interface{}) quad.Quad {
  return quad.Make(subject, predicate, object, label)

初始化圖數據庫的時候,指定的 writer 就 single

func NewGraph(name, dbpath string, opts graph.Options) (*Handle, error) {
      qs, err := graph.NewQuadStore(name, dbpath, opts)
      qw, err := graph.NewQuadWriter("single", qs, nil)
func NewMemoryGraph() (*Handle, error) {
  return NewGraph("memstore", "", nil)

server/http/common.go 裏面提供了基於 http 服務的操作

func jsonResponse(w http.ResponseWriter, code int, err interface{}) {
        data, _ := json.Marshal(s)
  w.Write(data)
func HandleForRequest(h *graph.Handle, wtyp string, wopt graph.Options, r *http.Request) (*graph.Handle, error) {
      qs, err := g.ForRequest(r)
      qw, err := graph.NewQuadWriter(wtyp, qs, wopt)
      return &graph.Handle{QuadStore: qs, QuadWriter: qw}, nil

appengine/appengine.go

func open(cfg *Config) (*graph.Handle, error) {
      qs, err := graph.NewQuadStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
      qw, err := graph.NewQuadWriter(cfg.ReplicationType, qs, cfg.ReplicationOptions)
      return &graph.Handle{QuadStore: qs, QuadWriter: qw}, nil

graph/registry.go

func NewQuadStore(name string, dbpath string, opts Options) (QuadStore, error) {
      r, registered := storeRegistry[name]
      return r.NewFunc(dbpath, opts)
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/joMH_aPhwv1XLEyN-bQYKQ