赞
踩
Node is the place where everthing starts.
// Node is a container on which services can be registered. type Node struct { eventmux *event.TypeMux // Event multiplexer used between the services of a stack config *Config accman *accounts.Manager ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop instanceDirLock fileutil.Releaser // prevents concurrent use of instance directory serverConfig p2p.Config server *p2p.Server // Currently running P2P networking layer serviceFuncs []ServiceConstructor // Service constructors (in dependency order) services map[reflect.Type]Service // Currently running services rpcAPIs []rpc.API // List of APIs currently provided by the node inprocHandler *rpc.Server // In-process RPC request handler to process the API requests ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled) ipcListener net.Listener // IPC RPC listener socket to serve API requests ipcHandler *rpc.Server // IPC RPC request handler to process the API requests httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) httpWhitelist []string // HTTP RPC modules to allow through this endpoint httpListener net.Listener // HTTP RPC listener socket to server API requests httpHandler *rpc.Server // HTTP RPC request handler to process the API requests wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled) wsListener net.Listener // Websocket RPC listener socket to server API requests wsHandler *rpc.Server // Websocket RPC request handler to process the API requests stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex log log.Logger }
// Manager is an overarching account manager that can communicate with various // backends for signing transactions. type Manager struct { config *Config // Global account manager configurations backends map[reflect.Type][]Backend // Index of backends currently registered updaters []event.Subscription // Wallet update subscriptions for all backends updates chan WalletEvent // Subscription sink for backend wallet changes wallets []Wallet // Cache of all wallets from all registered backends feed event.Feed // Wallet feed notifying of arrivals/departures quit chan chan error lock sync.RWMutex } // NewManager creates a generic account manager to sign transaction via various // supported backends. func NewManager(config *Config, backends ...Backend) *Manager { // Retrieve the initial list of wallets from the backends and sort by URL var wallets []Wallet for _, backend := range backends { wallets = merge(wallets, backend.Wallets()...) } // Subscribe to wallet notifications from all backends updates := make(chan WalletEvent, 4*len(backends)) subs := make([]event.Subscription, len(backends)) for i, backend := range backends { subs[i] = backend.Subscribe(updates) } // Assemble the account manager and return am := &Manager{ config: config, backends: make(map[reflect.Type][]Backend), updaters: subs, updates: updates, wallets: wallets, quit: make(chan chan error), } for _, backend := range backends { kind := reflect.TypeOf(backend) am.backends[kind] = append(am.backends[kind], backend) } go am.update() return am } // update is the wallet event loop listening for notifications from the backends // and updating the cache of wallets. func (am *Manager) update() { // Close all subscriptions when the manager terminates defer func() { am.lock.Lock() for _, sub := range am.updaters { sub.Unsubscribe() } am.updaters = nil am.lock.Unlock() }() // Loop until termination for { select { case event := <-am.updates: // Wallet event arrived, update local cache am.lock.Lock() switch event.Kind { case WalletArrived: am.wallets = merge(am.wallets, event.Wallet) case WalletDropped: am.wallets = drop(am.wallets, event.Wallet) } am.lock.Unlock() // Notify any listeners of the event am.feed.Send(event) case errc := <-am.quit: // Manager terminating, return errc <- nil return } } } // events const ( // WalletArrived is fired when a new wallet is detected either via USB or via // a filesystem event in the keystore. WalletArrived WalletEventType = iota // WalletOpened is fired when a wallet is successfully opened with the purpose // of starting any background processes such as automatic key derivation. WalletOpened // WalletDropped WalletDropped )
Account Mgr starts a goroutine ‘am.update()’ to receive update notification from any wallets. Each backend will subscribe to the update channel:
subs[i] = backend.Subscribe(updates)
Then the event will be fed to all listeners.
am.feed.Send(event)
func makeFullNode(ctx *cli.Context) *node.Node { stack, cfg := makeConfigNode(ctx) if ctx.GlobalIsSet(utils.OverrideIstanbulFlag.Name) { cfg.Eth.OverrideIstanbul = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideIstanbulFlag.Name)) } if ctx.GlobalIsSet(utils.OverrideMuirGlacierFlag.Name) { cfg.Eth.OverrideMuirGlacier = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideMuirGlacierFlag.Name)) } utils.RegisterEthService(stack, &cfg.Eth) // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode shhEnabled := enableWhisper(ctx) shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name) if shhEnabled || shhAutoEnabled { if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) { cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name)) } if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) { cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name) } if ctx.GlobalIsSet(utils.WhisperRestrictConnectionBetweenLightClientsFlag.Name) { cfg.Shh.RestrictConnectionBetweenLightClients = true } utils.RegisterShhService(stack, &cfg.Shh) } // Configure GraphQL if requested if ctx.GlobalIsSet(utils.GraphQLEnabledFlag.Name) { utils.RegisterGraphQLService(stack, cfg.Node.GraphQLEndpoint(), cfg.Node.GraphQLCors, cfg.Node.GraphQLVirtualHosts, cfg.Node.HTTPTimeouts) } // Add the Ethereum Stats daemon if requested. if cfg.Ethstats.URL != "" { utils.RegisterEthStatsService(stack, cfg.Ethstats.URL) } return stack }
The call stack of Ethereum object initialization:
github.com/ethereum/go-ethereum/eth.New at backend.go:119
github.com/ethereum/go-ethereum/cmd/utils.RegisterEthService.func2 at flags.go:1564
github.com/ethereum/go-ethereum/node.(*Node).Start at node.go:206
github.com/ethereum/go-ethereum/cmd/utils.StartNode at cmd.go:67
main.startNode at main.go:327
main.geth at main.go:308
gopkg.in/urfave/cli%2ev1.HandleAction at app.go:490
gopkg.in/urfave/cli%2ev1.(*App).Run at app.go:264
main.main at main.go:248
runtime.main at proc.go:203
runtime.goexit at asm_amd64.s:1357
- Async stack trace
runtime.rt0_go at asm_amd64.s:220
// New creates a new Ethereum object (including the // initialisation of the common Ethereum object) func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { // Ensure configuration values are compatible and sane if config.SyncMode == downloader.LightSync { return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum") } if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 { log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", DefaultConfig.Miner.GasPrice) config.Miner.GasPrice = new(big.Int).Set(DefaultConfig.Miner.GasPrice) } if config.NoPruning && config.TrieDirtyCache > 0 { config.TrieCleanCache += config.TrieDirtyCache config.TrieDirtyCache = 0 } log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) // Assemble the Ethereum object chainDb, err := ctx.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/") if err != nil { return nil, err } chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.OverrideIstanbul, config.OverrideMuirGlacier) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr } log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ config: config, chainDb: chainDb, eventMux: ctx.EventMux, accountManager: ctx.AccountManager, engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), shutdownChan: make(chan bool), networkID: config.NetworkId, gasPrice: config.Miner.GasPrice, etherbase: config.Miner.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "<nil>" if bcVersion != nil { dbVer = fmt.Sprintf("%d", *bcVersion) } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId, "dbversion", dbVer) if !config.SkipBcVersionCheck { if bcVersion != nil && *bcVersion > core.BlockChainVersion { return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, params.VersionWithMeta, core.BlockChainVersion) } else if bcVersion == nil || *bcVersion < core.BlockChainVersion { log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion) rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion) } } var ( vmConfig = vm.Config{ EnablePreimageRecording: config.EnablePreimageRecording, EWASMInterpreter: config.EWASMInterpreter, EVMInterpreter: config.EVMInterpreter, } cacheConfig = &core.CacheConfig{ TrieCleanLimit: config.TrieCleanCache, TrieCleanNoPrefetch: config.NoPrefetch, TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyDisabled: config.NoPruning, TrieTimeLimit: config.TrieTimeout, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve) if err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { log.Warn("Rewinding chain to upgrade configuration", "err", compat) eth.blockchain.SetHead(compat.RewindTo) rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig) } eth.bloomIndexer.Start(eth.blockchain) if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit checkpoint := config.Checkpoint if checkpoint == nil { checkpoint = params.TrustedCheckpoints[genesisHash] } if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil { return nil, err } eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{ctx.ExtRPCEnabled(), eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { gpoParams.Default = config.Miner.GasPrice } eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) eth.dialCandiates, err = eth.setupDiscovery(&ctx.Config.P2P) if err != nil { return nil, err } return eth, nil }
Start Service will start Ethereum.
// Start implements node.Service, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start(srvr *p2p.Server) error { s.startEthEntryUpdate(srvr.LocalNode()) // Start the bloom bits servicing goroutines s.startBloomHandlers(params.BloomBitsBlocks) // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) // Figure out a max peers count based on the server limits maxPeers := srvr.MaxPeers if s.config.LightServ > 0 { if s.config.LightPeers >= srvr.MaxPeers { return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) } maxPeers -= s.config.LightPeers } // Start the networking layer and the light server if requested s.protocolManager.Start(maxPeers) if s.lesServer != nil { s.lesServer.Start(srvr) } return nil }
func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. }
Ethereum sub protocol manager. can manage peers capable with the Ethereum network.
func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. }
Start a goroutine looping for accepting inbound connections. Start a background gorouting for updating NAT map.
This gorouting uses go chan to implement a limiter of max allowed connections. defaultMaxPendingPeers = 50 or --maxpendpeers value
Any inbound connection will be handled by:
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
}
return err
}
As a result, any valid connection will be accepted as a peer, by sending a message to srv.checkpointAddPeer.
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { closeCtx, cancel := context.WithCancel(context.Background()) t := &UDPv4{ conn: c, priv: cfg.PrivateKey, netrestrict: cfg.NetRestrict, localNode: ln, db: ln.Database(), gotreply: make(chan reply), addReplyMatcher: make(chan *replyMatcher), closeCtx: closeCtx, cancelCloseCtx: cancel, log: cfg.Log, } if t.log == nil { t.log = log.Root() } tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) if err != nil { return nil, err } t.tab = tab go tab.loop() t.wg.Add(2) go t.loop() go t.readLoop(cfg.Unhandled) return t, nil }
It is loosely inspired by the Kademlia DHT, but unlike most DHTs no arbitrary keys and values are stored. Instead, the DHT stores and relays ‘node records’, which are signed documents providing information about nodes in the network. Node Discovery acts as a database of all live nodes in the network and performs three basic functions:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。