golang 源碼分析:cayley-10-

下面分析下 mysql 作爲後端存儲的時候,是如何存儲的,它的核心源碼位 graph/sql/mysql/mysql.go,首先定義了存儲類型是 mysql,然後註冊。

const Type = "mysql"
func init() {
  csql.Register(Type, csql.Registration{
    Driver:               "mysql",
    HashType:             fmt.Sprintf(`BINARY(%d)`, quad.HashSize),
func runTxMysql(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error {
      for _, n := range nodes {
    if n.RefInc >= 0 {
      nodeKey, values, err := csql.NodeValues(csql.NodeHash{n.Hash}, n.Val)
      if err != nil {
        return err
      }
      values = append([]interface{}{n.RefInc}, values...)
      values = append(values, n.RefInc) // one more time for UPDATE
      stmt, ok := insertValue[nodeKey]
      if !ok {
        var ph = make([]string, len(values)-1) // excluding last increment
        for i := range ph {
          ph[i] = "?"
        }
        stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` +
          strings.Join(nodeKey.Columns(), ", ") +
          `) VALUES (` + strings.Join(ph, ", ") +
          `) ON DUPLICATE KEY UPDATE refs = refs + ?;`)
      _, err = stmt.Exec(values...)
      for _, d := range quads {
    dirs := make([]interface{}, 0, len(quad.Directions))
    for _, h := range d.Quad.Dirs() {
      dirs = append(dirs, csql.NodeHash{h}.SQLValue())
    }
    if !d.Del {
      if insertQuad == nil {
        insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`)
        if err != nil {
      _, err := insertQuad.Exec(dirs...)

實現了把頂點和四元祖插入了 mysql。

func convInsertError(err error) error {

graph/sql/database.go

var types = make(map[string]Registration)
func Register(name string, f Registration) {
  if f.Driver == "" {
    panic("no sql driver in type definition")
  }
  types[name] = f
  registerQuadStore(name, name)
type Registration struct {
  Driver             string // sql driver to use on dial
  HashType           string // type for hash fields
  BytesType          string // type for binary fields
  TimeType           string // type for datetime fields
  HorizonType        string // type for horizon counter
  NodesTableExtra    string // extra SQL to append to nodes table definition
  ConditionalIndexes bool   // database supports conditional indexes
  FillFactor         bool   // database supports fill percent on indexes
  NoForeignKeys      bool   // database has no support for FKs
  QueryDialect
  NoOffsetWithoutLimit bool // SELECT ... OFFSET can be used only with LIMIT
  Error               func(error) error         // error conversion function
  Estimated           func(table string) string // query that string that returns an estimated number of rows in table
  RunTx               func(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error
  TxRetry             func(tx *sql.Tx, stmts func() error) error
  NoSchemaChangesInTx bool
}

創建節點表

func (r Registration) nodesTable() string {
  htyp := r.HashType
  if htyp == "" {
    htyp = "BYTEA"
  }
  btyp := r.BytesType
  if btyp == "" {
    btyp = "BYTEA"
  }
  ttyp := r.TimeType
  if ttyp == "" {
    ttyp = "timestamp with time zone"
  }
  end := "\n);"
  if r.NodesTableExtra != "" {
    end = ",\n" + r.NodesTableExtra + end
  }
  return `CREATE TABLE nodes (
  hash ` + htyp + ` PRIMARY KEY,
  refs INT NOT NULL,
  value ` + btyp + `,
  value_string TEXT,
  datatype TEXT,
  language TEXT,
  iri BOOLEAN,
  bnode BOOLEAN,
  value_int BIGINT,
  value_bool BOOLEAN,
  value_float double precision,
  value_time ` + ttyp +
    end
}

創建四元祖表

func (r Registration) quadsTable() string {
  htyp := r.HashType
  if htyp == "" {
    htyp = "BYTEA"
  }
  hztyp := r.HorizonType
  if hztyp == "" {
    hztyp = "SERIAL"
  }
  return `CREATE TABLE quads (
  horizon ` + hztyp + ` PRIMARY KEY,
  subject_hash ` + htyp + ` NOT NULL,
  predicate_hash ` + htyp + ` NOT NULL,
  object_hash ` + htyp + ` NOT NULL,
  label_hash ` + htyp + `,
  ts timestamp
);`
}

創建相關索引

func (r Registration) quadIndexes(options graph.Options) []string {
  indexes := make([]string, 0, 10)
  if r.ConditionalIndexes {
    indexes = append(indexes,
      `CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash) WHERE label_hash IS NULL;`,
      `CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash) WHERE label_hash IS NOT NULL;`,
    )
  } else {
    indexes = append(indexes,
      `CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash);`,
      `CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash);`,
    )
  }
  if !r.NoForeignKeys {
    indexes = append(indexes,
      `ALTER TABLE quads ADD CONSTRAINT subject_hash_fk FOREIGN KEY (subject_hash) REFERENCES nodes (hash);`,
      `ALTER TABLE quads ADD CONSTRAINT predicate_hash_fk FOREIGN KEY (predicate_hash) REFERENCES nodes (hash);`,
      `ALTER TABLE quads ADD CONSTRAINT object_hash_fk FOREIGN KEY (object_hash) REFERENCES nodes (hash);`,
      `ALTER TABLE quads ADD CONSTRAINT label_hash_fk FOREIGN KEY (label_hash) REFERENCES nodes (hash);`,
    )
  }
  quadIndexes := [][3]quad.Direction{
    {quad.Subject, quad.Predicate, quad.Object},
    {quad.Object, quad.Predicate, quad.Subject},
    {quad.Predicate, quad.Object, quad.Subject},
    {quad.Object, quad.Subject, quad.Predicate},
  }
  factor, _ := options.IntKey("db_fill_factor", 50)
  for _, ind := range quadIndexes {
    var (
      name string
      cols []string
    )
    for _, d := range ind {
      name += string(d.Prefix())
      cols = append(cols, d.String()+"_hash")
    }
    q := fmt.Sprintf(`CREATE INDEX %s_index ON quads (%s)`,
      name, strings.Join(cols, ", "))
    if r.FillFactor {
      q += fmt.Sprintf(" WITH (FILLFACTOR = %d)", factor)
    }
    indexes = append(indexes, q+";")
  }
  return indexes
}

graph/sql/iterator.go

var _ shape.Optimizer = (*QuadStore)(nil)
func (qs *QuadStore) OptimizeShape(s shape.Shape) (shape.Shape, bool) {
  return qs.opt.OptimizeShape(s)

根據查詢,組裝 sql 語句:

func (qs *QuadStore) prepareQuery(s Shape) (string, []interface{}) {
  args := s.Args()
  vals := make([]interface{}, 0, len(args))
  for _, a := range args {
    vals = append(vals, a.SQLValue())
  }
  b := NewBuilder(qs.flavor.QueryDialect)
  qu := s.SQL(b)
  return qu, vals

執行查詢

func (qs *QuadStore) QueryRow(ctx context.Context, s Shape) *sql.Row {
  qu, vals := qs.prepareQuery(s)
  return qs.db.QueryRowContext(ctx, qu, vals...)
var _ graph.IteratorFuture = (*Iterator)(nil)
func (qs *QuadStore) NewIterator(s Select) *Iterator {
  it := &Iterator{
    it: qs.newIterator(s),
  }
  it.Iterator = graph.NewLegacy(it.it, it)
  return it
type Iterator struct {
  it *iterator2
  graph.Iterator
}

迭代器有兩個

var _ graph.IteratorShapeCompat = (*iterator2)(nil)
type iterator2 struct {
  qs    *QuadStore
  query Select
  err   error
}
func (it *iterator2) Iterate() graph.Scanner {
  return newIteratorNext(it.qs, it.query)
}
func (it *iterator2) Stats(ctx context.Context) (graph.IteratorCosts, error) {
  sz, err := it.getSize(ctx)
  return graph.IteratorCosts{
    NextCost:     1,
    ContainsCost: 10,
    Size:         sz,
  }, err
func (it *iterator2) String() string {
  return it.query.SQL(NewBuilder(it.qs.flavor.QueryDialect))
}

        基礎迭代器

type iteratorBase struct {
  qs    *QuadStore
  query Select
  cols []string
  cind map[quad.Direction]int
  err  error
  res  graph.Ref
  tags map[string]graph.Ref
}

掃描行結果

func (it *iteratorBase) ensureColumns() {
  if it.cols != nil {
    return
  }
  it.cols = it.query.Columns()
  it.cind = make(map[quad.Direction]int, len(quad.Directions)+1)
  for i, name := range it.cols {
    if !strings.HasPrefix(name, tagPref) {
      continue
    }
    if name == tagNode {
      it.cind[quad.Any] = i
      continue
    }
    name = name[len(tagPref):]
    for _, d := range quad.Directions {
      if name == d.String() {
        it.cind[d] = i
        break
      }
    }
  }
}

掃描值

func (it *iteratorBase) scanValue(r *sql.Rows) bool {
  it.ensureColumns()
  nodes := make([]NodeHash, len(it.cols))
  pointers := make([]interface{}, len(nodes))
  for i := range pointers {
    pointers[i] = &nodes[i]
  }
  if err := r.Scan(pointers...); err != nil {
      for i, name := range it.cols {
    if !strings.Contains(name, tagPref) {
      it.tags[name] = nodes[i].ValueHash
    }
type iteratorNext struct {
  iteratorBase
  cursor *sql.Rows
  // TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in general
  nextPathRes  graph.Ref
  nextPathTags map[string]graph.Ref
}
type iteratorContains struct {
  iteratorBase
  // TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in general
  nextPathRows *sql.Rows
}

進行層序遍歷,先遍歷 next,結束後調用 nextpath

func (it *iteratorContains) Contains(ctx context.Context, v graph.Ref) bool {
  it.ensureColumns()
  sel := it.query
  sel.Where = append([]Where{}, sel.Where...)
  switch v := v.(type) {
  case NodeHash:
    i, ok := it.cind[quad.Any]
    if !ok {
      return false
    }
    f := it.query.Fields[i]
    sel.WhereEq(f.Table, f.Name, v)
  case QuadHashes:
    for _, d := range quad.Directions {
      i, ok := it.cind[d]
      if !ok {
        return false
      }
      h := v.Get(d)
      if !h.Valid() {
        continue
      }
      f := it.query.Fields[i]
      sel.WhereEq(f.Table, f.Name, NodeHash{h})
    }
  default:
    return false
  }
  rows, err := it.qs.Query(ctx, sel)
  if err != nil {
    it.err = err
    return false
  }
  if it.query.nextPath {
    if it.nextPathRows != nil {
      _ = it.nextPathRows.Close()
    }
    it.nextPathRows = rows
  } else {
    defer rows.Close()
  }
  if !rows.Next() {
    it.err = rows.Err()
    return false
  }
  return it.scanValue(rows)
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/H1G_tm3uD_5HdRcE0vo3gw