golang 源碼分析:cayley-10-
下面分析下 mysql 作爲後端存儲的時候,是如何存儲的,它的核心源碼位 graph/sql/mysql/mysql.go,首先定義了存儲類型是 mysql,然後註冊。
const Type = "mysql"
func init() {
csql.Register(Type, csql.Registration{
Driver: "mysql",
HashType: fmt.Sprintf(`BINARY(%d)`, quad.HashSize),
func runTxMysql(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error {
for _, n := range nodes {
if n.RefInc >= 0 {
nodeKey, values, err := csql.NodeValues(csql.NodeHash{n.Hash}, n.Val)
if err != nil {
return err
}
values = append([]interface{}{n.RefInc}, values...)
values = append(values, n.RefInc) // one more time for UPDATE
stmt, ok := insertValue[nodeKey]
if !ok {
var ph = make([]string, len(values)-1) // excluding last increment
for i := range ph {
ph[i] = "?"
}
stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` +
strings.Join(nodeKey.Columns(), ", ") +
`) VALUES (` + strings.Join(ph, ", ") +
`) ON DUPLICATE KEY UPDATE refs = refs + ?;`)
_, err = stmt.Exec(values...)
for _, d := range quads {
dirs := make([]interface{}, 0, len(quad.Directions))
for _, h := range d.Quad.Dirs() {
dirs = append(dirs, csql.NodeHash{h}.SQLValue())
}
if !d.Del {
if insertQuad == nil {
insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`)
if err != nil {
_, err := insertQuad.Exec(dirs...)
實現了把頂點和四元祖插入了 mysql。
func convInsertError(err error) error {
graph/sql/database.go
var types = make(map[string]Registration)
func Register(name string, f Registration) {
if f.Driver == "" {
panic("no sql driver in type definition")
}
types[name] = f
registerQuadStore(name, name)
type Registration struct {
Driver string // sql driver to use on dial
HashType string // type for hash fields
BytesType string // type for binary fields
TimeType string // type for datetime fields
HorizonType string // type for horizon counter
NodesTableExtra string // extra SQL to append to nodes table definition
ConditionalIndexes bool // database supports conditional indexes
FillFactor bool // database supports fill percent on indexes
NoForeignKeys bool // database has no support for FKs
QueryDialect
NoOffsetWithoutLimit bool // SELECT ... OFFSET can be used only with LIMIT
Error func(error) error // error conversion function
Estimated func(table string) string // query that string that returns an estimated number of rows in table
RunTx func(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error
TxRetry func(tx *sql.Tx, stmts func() error) error
NoSchemaChangesInTx bool
}
創建節點表
func (r Registration) nodesTable() string {
htyp := r.HashType
if htyp == "" {
htyp = "BYTEA"
}
btyp := r.BytesType
if btyp == "" {
btyp = "BYTEA"
}
ttyp := r.TimeType
if ttyp == "" {
ttyp = "timestamp with time zone"
}
end := "\n);"
if r.NodesTableExtra != "" {
end = ",\n" + r.NodesTableExtra + end
}
return `CREATE TABLE nodes (
hash ` + htyp + ` PRIMARY KEY,
refs INT NOT NULL,
value ` + btyp + `,
value_string TEXT,
datatype TEXT,
language TEXT,
iri BOOLEAN,
bnode BOOLEAN,
value_int BIGINT,
value_bool BOOLEAN,
value_float double precision,
value_time ` + ttyp +
end
}
創建四元祖表
func (r Registration) quadsTable() string {
htyp := r.HashType
if htyp == "" {
htyp = "BYTEA"
}
hztyp := r.HorizonType
if hztyp == "" {
hztyp = "SERIAL"
}
return `CREATE TABLE quads (
horizon ` + hztyp + ` PRIMARY KEY,
subject_hash ` + htyp + ` NOT NULL,
predicate_hash ` + htyp + ` NOT NULL,
object_hash ` + htyp + ` NOT NULL,
label_hash ` + htyp + `,
ts timestamp
);`
}
創建相關索引
func (r Registration) quadIndexes(options graph.Options) []string {
indexes := make([]string, 0, 10)
if r.ConditionalIndexes {
indexes = append(indexes,
`CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash) WHERE label_hash IS NULL;`,
`CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash) WHERE label_hash IS NOT NULL;`,
)
} else {
indexes = append(indexes,
`CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash);`,
`CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash);`,
)
}
if !r.NoForeignKeys {
indexes = append(indexes,
`ALTER TABLE quads ADD CONSTRAINT subject_hash_fk FOREIGN KEY (subject_hash) REFERENCES nodes (hash);`,
`ALTER TABLE quads ADD CONSTRAINT predicate_hash_fk FOREIGN KEY (predicate_hash) REFERENCES nodes (hash);`,
`ALTER TABLE quads ADD CONSTRAINT object_hash_fk FOREIGN KEY (object_hash) REFERENCES nodes (hash);`,
`ALTER TABLE quads ADD CONSTRAINT label_hash_fk FOREIGN KEY (label_hash) REFERENCES nodes (hash);`,
)
}
quadIndexes := [][3]quad.Direction{
{quad.Subject, quad.Predicate, quad.Object},
{quad.Object, quad.Predicate, quad.Subject},
{quad.Predicate, quad.Object, quad.Subject},
{quad.Object, quad.Subject, quad.Predicate},
}
factor, _ := options.IntKey("db_fill_factor", 50)
for _, ind := range quadIndexes {
var (
name string
cols []string
)
for _, d := range ind {
name += string(d.Prefix())
cols = append(cols, d.String()+"_hash")
}
q := fmt.Sprintf(`CREATE INDEX %s_index ON quads (%s)`,
name, strings.Join(cols, ", "))
if r.FillFactor {
q += fmt.Sprintf(" WITH (FILLFACTOR = %d)", factor)
}
indexes = append(indexes, q+";")
}
return indexes
}
graph/sql/iterator.go
var _ shape.Optimizer = (*QuadStore)(nil)
func (qs *QuadStore) OptimizeShape(s shape.Shape) (shape.Shape, bool) {
return qs.opt.OptimizeShape(s)
根據查詢,組裝 sql 語句:
func (qs *QuadStore) prepareQuery(s Shape) (string, []interface{}) {
args := s.Args()
vals := make([]interface{}, 0, len(args))
for _, a := range args {
vals = append(vals, a.SQLValue())
}
b := NewBuilder(qs.flavor.QueryDialect)
qu := s.SQL(b)
return qu, vals
執行查詢
func (qs *QuadStore) QueryRow(ctx context.Context, s Shape) *sql.Row {
qu, vals := qs.prepareQuery(s)
return qs.db.QueryRowContext(ctx, qu, vals...)
var _ graph.IteratorFuture = (*Iterator)(nil)
func (qs *QuadStore) NewIterator(s Select) *Iterator {
it := &Iterator{
it: qs.newIterator(s),
}
it.Iterator = graph.NewLegacy(it.it, it)
return it
type Iterator struct {
it *iterator2
graph.Iterator
}
迭代器有兩個
var _ graph.IteratorShapeCompat = (*iterator2)(nil)
type iterator2 struct {
qs *QuadStore
query Select
err error
}
func (it *iterator2) Iterate() graph.Scanner {
return newIteratorNext(it.qs, it.query)
}
func (it *iterator2) Stats(ctx context.Context) (graph.IteratorCosts, error) {
sz, err := it.getSize(ctx)
return graph.IteratorCosts{
NextCost: 1,
ContainsCost: 10,
Size: sz,
}, err
func (it *iterator2) String() string {
return it.query.SQL(NewBuilder(it.qs.flavor.QueryDialect))
}
基礎迭代器
type iteratorBase struct {
qs *QuadStore
query Select
cols []string
cind map[quad.Direction]int
err error
res graph.Ref
tags map[string]graph.Ref
}
掃描行結果
func (it *iteratorBase) ensureColumns() {
if it.cols != nil {
return
}
it.cols = it.query.Columns()
it.cind = make(map[quad.Direction]int, len(quad.Directions)+1)
for i, name := range it.cols {
if !strings.HasPrefix(name, tagPref) {
continue
}
if name == tagNode {
it.cind[quad.Any] = i
continue
}
name = name[len(tagPref):]
for _, d := range quad.Directions {
if name == d.String() {
it.cind[d] = i
break
}
}
}
}
掃描值
func (it *iteratorBase) scanValue(r *sql.Rows) bool {
it.ensureColumns()
nodes := make([]NodeHash, len(it.cols))
pointers := make([]interface{}, len(nodes))
for i := range pointers {
pointers[i] = &nodes[i]
}
if err := r.Scan(pointers...); err != nil {
for i, name := range it.cols {
if !strings.Contains(name, tagPref) {
it.tags[name] = nodes[i].ValueHash
}
type iteratorNext struct {
iteratorBase
cursor *sql.Rows
// TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in general
nextPathRes graph.Ref
nextPathTags map[string]graph.Ref
}
type iteratorContains struct {
iteratorBase
// TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in general
nextPathRows *sql.Rows
}
進行層序遍歷,先遍歷 next,結束後調用 nextpath
func (it *iteratorContains) Contains(ctx context.Context, v graph.Ref) bool {
it.ensureColumns()
sel := it.query
sel.Where = append([]Where{}, sel.Where...)
switch v := v.(type) {
case NodeHash:
i, ok := it.cind[quad.Any]
if !ok {
return false
}
f := it.query.Fields[i]
sel.WhereEq(f.Table, f.Name, v)
case QuadHashes:
for _, d := range quad.Directions {
i, ok := it.cind[d]
if !ok {
return false
}
h := v.Get(d)
if !h.Valid() {
continue
}
f := it.query.Fields[i]
sel.WhereEq(f.Table, f.Name, NodeHash{h})
}
default:
return false
}
rows, err := it.qs.Query(ctx, sel)
if err != nil {
it.err = err
return false
}
if it.query.nextPath {
if it.nextPathRows != nil {
_ = it.nextPathRows.Close()
}
it.nextPathRows = rows
} else {
defer rows.Close()
}
if !rows.Next() {
it.err = rows.Err()
return false
}
return it.scanValue(rows)
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/H1G_tm3uD_5HdRcE0vo3gw