create account

[GO CODE REVIEW REQUEST] Multithreading Ingestron, the STEEM blockchain-eater by faddat

View this thread on: hive.blogpeakd.comecency.com
· @faddat · (edited)
$62.97
[GO CODE REVIEW REQUEST] Multithreading Ingestron, the STEEM blockchain-eater
## <center>Show me (and my code)</center>
# <center> NO MERCY!</center>

<center>I'm looking for feedback on:</center>

* Bugs
* Style
* Idioms
* Multithreading Technique
* Package Selection
....more or less everything

Here's [github](https://github.com/faddat/ingestron), make sure to look at the ["multithreaded" branch](https://github.com/faddat/ingestron/tree/multithreaded).  
```
package main

import (
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
  "gopkg.in/go-playground/pool.v3"
	"github.com/astaxie/flatmap"
	"github.com/go-steem/rpc"
	"github.com/go-steem/rpc/transports/websocket"
	r "gopkg.in/dancannon/gorethink.v2"
)

func main() {
	if err := run(); err != nil {
		log.Fatalln("Error:", err)
	}
}

// Set the settings for the DB
func run() (err error) {
	Rsession, err := r.Connect(r.ConnectOpts{
		Addresses: []string{"192.168.194.91:28015", "192.168.194.209:28015", "192.168.194.98:28015", "192.168.194.145:28015"},
	})
	if err != nil {
		log.Fatalln(err.Error())
	}

	// Create a table in the DB
	var rethinkdbname = "steemit75"
	_, err = r.DBCreate(rethinkdbname).RunWrite(Rsession)
	Rsession.Use(rethinkdbname)
	if err != nil {
		fmt.Println("rethindb DB already made")
	}

	_, err = r.DB(rethinkdbname).TableCreate("transactions").RunWrite(Rsession)
	if err != nil {
		fmt.Println("Probably already made a table for transactions")

	}

	_, err = r.DB(rethinkdbname).TableCreate("flatblocks").RunWrite(Rsession)
	if err != nil {
		fmt.Println("Probably already made a table for flat blocks")

	}

	_, err = r.DB(rethinkdbname).TableCreate("operations").RunWrite(Rsession)
	if err != nil {
		fmt.Println("Probably already made a table for flat blocks")

	}

	// Process flags.
	flagAddress := flag.String("rpc_endpoint", "ws://192.168.194.91:8090", "steemd RPC endpoint address")
	flagReconnect := flag.Bool("reconnect", true, "enable auto-reconnect mode")
	flag.Parse()

	var (
		url       = *flagAddress
		reconnect = *flagReconnect
	)

	// Start catching signals.
	var interrupted bool
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

	// Drop the error in case it is a request being interrupted.
	defer func() {
		if err == websocket.ErrClosing && interrupted {
			err = nil
		}
	}()
	// This allows you to tell the app which block to start on.
	// TODO: Make all of the vars into a config file and package the binaries

	// Start the connection monitor.
	monitorChan := make(chan interface{}, 1)
	if reconnect {
		go func() {
			for {
				event, ok := <-monitorChan
				if ok {
					log.Println(event)
				}
			}
		}()
	}

	// Instantiate the WebSocket transport.
	log.Printf("---> Dial(\"%v\")\n", url)
	t, err := websocket.NewTransport(url,
		websocket.SetAutoReconnectEnabled(reconnect),
		websocket.SetAutoReconnectMaxDelay(30*time.Second),
		websocket.SetMonitor(monitorChan))
	if err != nil {
		return err
	}

	// Use the transport to get an RPC client.
	client, err := rpc.NewClient(t)
	if err != nil {
		return err
	}
	defer func() {
		if !interrupted {
			client.Close()
		}
	}()

	// Start processing signals.
	go func() {
		<-signalCh
		fmt.Println()
		log.Println("Signal received, exiting...")
		signal.Stop(signalCh)
		interrupted = true
		client.Close()
	}()

	// Keep processing incoming blocks forever.
	fmt.Println("---> Entering the block processing loop")
	for {
		props, err := client.Database.GetDynamicGlobalProperties()
		lb := int(props.LastIrreversibleBlockNum)
		p := pool.NewLimited(100)
		defer p.Close()
		batch := p.Batch()
		go func() {
			for U := 1; lb < U; U++ {
				batch.Queue(readsandWrites(U, Rsession, client))
				fmt.Println(U)
			}
			batch.QueueComplete()
		}()
		if err != nil {
			return err
		}
	}
}

func readsandWrites(U int, Rsession *r.Session, client *rpc.Client) pool.WorkFunc {
	return func(wu pool.WorkUnit) (interface{}, error) {
		u := uint32(U)
		// blockchain reads
		block, err := client.Database.GetBlock(u)
		blockraw, err := client.Database.GetBlockRaw(u)
		var data = blockraw
		var mp map[string]interface{}
		if err := json.Unmarshal([]byte(*data), &mp); err != nil {
			log.Fatal(err)
		}
		Fm, err := flatmap.Flatten(mp)
		// Rethinkdb writes
		r.Table("transactions").
			Insert(block.Transactions).
			Exec(Rsession)
		r.Table("flatblocks").
			Insert(Fm).
			Exec(Rsession)
		r.Table("nestedblocks").
			Insert(block).
			Exec(Rsession)
			fmt.Println(u)
			if err != nil {
				log.Fatal(err)
			}
			return true, nil
		}
}
```

NOTE: This usually runs against a 4-machine Skylake/Dual SSD/32GB per machine cluster.  Please either substitute your own cluster by changing code IP addresses or contact me for VPN info for mine.  Thanks for your help!

----
If this kind of stuff tickles your fancy, follow me, @faddat
👍  , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ,
properties (23)
authorfaddat
permlinkgo-code-review-request-multithreading-ingestron-the-steem-blockchain-eater
categorybeyondbitcoin
json_metadata{"tags":["beyondbitcoin","steem","golang","docker","linux"],"users":["faddat"],"links":["https://github.com/faddat/ingestron","https://github.com/faddat/ingestron/tree/multithreaded"]}
created2016-09-14 22:57:51
last_update2016-09-15 01:40:00
depth0
children0
last_payout2016-10-16 00:54:03
cashout_time1969-12-31 23:59:59
total_payout_value51.449 HBD
curator_payout_value11.516 HBD
pending_payout_value0.000 HBD
promoted0.000 HBD
body_length4,791
author_reputation36,581,868,473,026
root_title"[GO CODE REVIEW REQUEST] Multithreading Ingestron, the STEEM blockchain-eater"
beneficiaries[]
max_accepted_payout1,000,000.000 HBD
percent_hbd10,000
post_id1,248,332
net_rshares32,404,166,151,317
author_curate_reward""
vote details (57)