赞
踩
在fabric中,与外界的操作,基本都是通过链码(智能合约)来实现,所以说,链码称为链应用对外的API是完全可以的。不过,此API非常见的那种开发的的API,它指用户编写的应用代码,外界可以通过部署其到Fabric上,完成和链的交互通信,将数据存储到链上或者查询链上存储的数据。它分为两大类,即系统链码和用户链码。系统链码就是随链安装就存在的,一般用来处理链节点自身功能的代码;用户链码是通过编写代码完成与链交互操作的代码。
系统链码主要包括以下几类:
CSCC(Configuration System Chaincode):负责帐本和链自身的配置管理。
ESCC(Endorsement System Chaincode):背书管理系统链码,提供背书过程,也可以说对背书策略进行管理。
LSCC(Lifecycle System Chaincode):生命周期系统链码,提供对链码的生命周期进行管理,包括安装、部署、升级等。
QSCC(Query System Chaincode):查询系统链码,提供链上的数据查询。
VSCC(Verification System Chaincode):验证系统链码,用来提供交前的背书策略检查。
通过上述可以发现,系统链码主要处理逻辑基本都是链自身需要的。
链码可以认为是一个在Docker容器中运行的操作系统进程,通过RPC和链进行通信。
知道了链码的内容,链启动时首先会启动相关的系统链码,所以首先分析一下系统链码的启动源码:
在背书的过程中,看到了serve函数(peer/node/start.go)中sccp.DeploySysCCs("", ccp)中的加载过程:
//DeploySysCCs is the hook for system chaincodes where system chaincodes are registered with the fabric //note the chaincode must still be deployed and launched like a user chaincode will be func (p *Provider) DeploySysCCs(chainID string, ccp ccprovider.ChaincodeProvider) { for _, sysCC := range p.SysCCs { deploySysCC(chainID, ccp, sysCC) } } // CCContext pass this around instead of string of args type CCContext struct { // Name chaincode name Name string // Version used to construct the chaincode image and register Version string } // deploySysCC deploys the given system chaincode on a chain func deploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error { //条件判断,系统链码的标志位和白名单是否允许 if !syscc.Enabled() || !isWhitelisted(syscc) { sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s) disabled", syscc.Name(), syscc.Path())) return nil } //生成UUID txid := util.GenerateUUID() // Note, this structure is barely initialized, // we omit the history query executor, the proposal // and the signed proposal交易参数 txParams := &ccprovider.TransactionParams{ TxID: txid, ChannelID: chainID, } if chainID != "" { //通过通道ID获得帐本 lgr := peer.GetLedger(chainID) if lgr == nil { panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID)) } //生成交易模拟器 txsim, err := lgr.NewTxSimulator(txid) if err != nil { return err } txParams.TXSimulator = txsim //退出后释放模拟器资源 defer txsim.Done() } //生成ChaincodeID实例,设置系统链码的路径和名称 chaincodeID := &pb.ChaincodeID{Path: syscc.Path(), Name: syscc.Name()} //生成描述对象, spec := &pb.ChaincodeSpec{Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]), ChaincodeId: chaincodeID, Input: &pb.ChaincodeInput{Args: syscc.InitArgs()}} chaincodeDeploymentSpec := &pb.ChaincodeDeploymentSpec{ExecEnv: pb.ChaincodeDeploymentSpec_SYSTEM, ChaincodeSpec: spec} // XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binary version := util.GetSysCCVersion() cccid := &ccprovider.CCContext{ Name: chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeId.Name, Version: version, } //初始化 resp, _, err := ccprov.ExecuteLegacyInit(txParams, cccid, chaincodeDeploymentSpec) if err == nil && resp.Status != shim.OK { err = errors.New(resp.Message) } sysccLogger.Infof("system chaincode %s/%s(%s) deployed", syscc.Name(), chainID, syscc.Path()) return err }
看一下初始化的代码
// ExecuteLegacyInit executes a chaincode which is not in the LSCC table func (c *CCProviderImpl) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) { return c.cs.ExecuteLegacyInit(txParams, cccid, spec) } // ExecuteLegacyInit is a temporary method which should be removed once the old style lifecycle // is entirely deprecated. Ideally one release after the introduction of the new lifecycle. // It does not attempt to start the chaincode based on the information from lifecycle, but instead // accepts the container information directly in the form of a ChaincodeDeploymentSpec. func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) { ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec) ccci.Version = cccid.Version //启动运行时 err := cs.LaunchInit(ccci) if err != nil { return nil, nil, err } //获得链码实例句柄 cname := ccci.Name + ":" + ccci.Version h := cs.HandlerRegistry.Handler(cname) if h == nil { return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname) } resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h) return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err) } func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error { var startFailCh chan error var timeoutCh <-chan time.Time startTime := time.Now() cname := ccci.Name + ":" + ccci.Version launchState, alreadyStarted := r.Registry.Launching(cname) if !alreadyStarted { startFailCh = make(chan error, 1) timeoutCh = time.NewTimer(r.StartupTimeout).C codePackage, err := r.getCodePackage(ccci) if err != nil { return err } go func() { //启动容器 if err := r.Runtime.Start(ccci, codePackage); err != nil { startFailCh <- errors.WithMessage(err, "error starting container") return } exitCode, err := r.Runtime.Wait(ccci) if err != nil { launchState.Notify(errors.Wrap(err, "failed to wait on container exit")) } launchState.Notify(errors.Errorf("container exited with %d", exitCode)) }() } var err error select { case <-launchState.Done(): err = errors.WithMessage(launchState.Err(), "chaincode registration failed") case err = <-startFailCh: launchState.Notify(err) r.Metrics.LaunchFailures.With("chaincode", cname).Add(1) case <-timeoutCh: err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname) launchState.Notify(err) r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1) } success := true if err != nil && !alreadyStarted { success = false chaincodeLogger.Debugf("stopping due to error while launching: %+v", err) defer r.Registry.Deregister(cname) if err := r.Runtime.Stop(ccci); err != nil { chaincodeLogger.Debugf("stop failed: %+v", err) } } r.Metrics.LaunchDuration.With( "chaincode", cname, "success", strconv.FormatBool(success), ).Observe(time.Since(startTime).Seconds()) chaincodeLogger.Debug("launch complete") return err } // Start launches chaincode in a runtime environment.启动docker容器 func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error { cname := ccci.Name + ":" + ccci.Version lc, err := c.LaunchConfig(cname, ccci.Type) if err != nil { return err } chaincodeLogger.Debugf("start container: %s", cname) chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " ")) chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t")) scr := container.StartContainerReq{ Builder: &container.PlatformBuilder{ Type: ccci.Type, Name: ccci.Name, Version: ccci.Version, Path: ccci.Path, CodePackage: codePackage, PlatformRegistry: c.PlatformRegistry, }, Args: lc.Args, Env: lc.Envs, FilesToUpload: lc.Files, CCID: ccintf.CCID{ Name: ccci.Name, Version: ccci.Version, }, } //此处将虚拟机启动 if err := c.Processor.Process(ccci.ContainerType, scr); err != nil { return errors.WithMessage(err, "error starting container") } return nil } // execute executes a transaction and waits for it to complete until a timeout value. func (cs *ChaincodeSupport) execute(cctyp pb.ChaincodeMessage_Type, txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput, h *Handler) (*pb.ChaincodeMessage, error) { input.Decorations = txParams.ProposalDecorations ccMsg, err := createCCMessage(cctyp, txParams.ChannelID, txParams.TxID, input) if err != nil { return nil, errors.WithMessage(err, "failed to create chaincode message") } ccresp, err := h.Execute(txParams, cccid, ccMsg, cs.ExecuteTimeout) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error sending")) } return ccresp, nil } func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) { chaincodeLogger.Debugf("Entry") defer chaincodeLogger.Debugf("Exit") txParams.CollectionStore = h.getCollectionStore(msg.ChannelId) txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT) txctx, err := h.TXContexts.Create(txParams) if err != nil { return nil, err } defer h.TXContexts.Delete(msg.ChannelId, msg.Txid) if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil { return nil, err } //发送通信消息 h.serialSendAsync(msg) var ccresp *pb.ChaincodeMessage select { case ccresp = <-txctx.ResponseNotifier: // response is sent to user or calling chaincode. ChaincodeMessage_ERROR // are typically treated as error case <-time.After(timeout): err = errors.New("timeout expired while executing transaction") ccName := cccid.Name + ":" + cccid.Version h.Metrics.ExecuteTimeouts.With("chaincode", ccName).Add(1) case <-h.streamDone(): err = errors.New("chaincode stream terminated") } return ccresp, err }
可以用下面的函数停止系统链码
// deDeploySysCC stops the system chaincode and deregisters it from inproccontroller func deDeploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error { // XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binary version := util.GetSysCCVersion() ccci := &ccprovider.ChaincodeContainerInfo{ Type: "GOLANG", Name: syscc.Name(), Path: syscc.Path(), Version: version, ContainerType: inproccontroller.ContainerType, } err := ccprov.Stop(ccci) return err }
这样在Peer节点中的启动流程就OK了。下一步重点是要处理一下在Dockers容器的启动操作。这段和用户链码中的一致,就放到下面的用户链码中一起分析学习。
而在背书中已经分析了系统链码的初始化(peer.Initialize)相关代码,如果有疑问可参看。
系统链码的使用方式,安装,实例化、调用和查询;打包即打包后的安装;链码的升级:
peer chaincode install -p chaincodedev/chaincode/sacc -n mycc -v 0
peer chaincode instantiate -n mycc -v 0 -c '{"Args":["a","10"]}' -C myc
peer chaincode invoke -n mycc -c '{"Args":["set", "a", "20"]}' -C myc
peer chaincode query -n mycc -c '{"Args":["query","a"]}' -C myc
peer chaincode package -n mycc -p chaincodedev/chaincode/sacc -v 0 -s -S -i "AND('OrgA.admin')" ccpack.out
peer chaincode signpackage ccpack.out signedccpack.out
peer chaincode install -p chaincodedev/chaincode/sacc -n mycc -v 1
peer chaincode upgrade -n mycc -v 1 -c '{"Args":["a", "100"]}' -C myc
上面是比较常用的一些链码的操作命令,下面就针对上面的命令对其进行源码的解析
用户链码的装载代码如下:
//是否还记得Peer节点中的命令装载和启动 // installCmd returns the cobra command for Chaincode Deploy func installCmd(cf *ChaincodeCmdFactory) *cobra.Command { chaincodeInstallCmd = &cobra.Command{ Use: "install", Short: fmt.Sprint(installDesc), Long: fmt.Sprint(installDesc), ValidArgs: []string{"1"}, RunE: func(cmd *cobra.Command, args []string) error { var ccpackfile string if len(args) > 0 { ccpackfile = args[0] } return chaincodeInstall(cmd, ccpackfile, cf) }, } flagList := []string{ "lang", "ctor", "path", "name", "version", "peerAddresses", "tlsRootCertFiles", "connectionProfile", } attachFlags(chaincodeInstallCmd, flagList) return chaincodeInstallCmd } // chaincodeInstall installs the chaincode. If remoteinstall, does it via a lscc call func chaincodeInstall(cmd *cobra.Command, ccpackfile string, cf *ChaincodeCmdFactory) error { // Parsing of the command line is done so silence cmd usage cmd.SilenceUsage = true var err error if cf == nil { //初始化默认的客户端 cf, err = InitCmdFactory(cmd.Name(), true, false) if err != nil { return err } } var ccpackmsg proto.Message if ccpackfile == "" { if chaincodePath == common.UndefinedParamValue || chaincodeVersion == common.UndefinedParamValue || chaincodeName == common.UndefinedParamValue { return fmt.Errorf("Must supply value for %s name, path and version parameters.", chainFuncName) } //generate a raw ChaincodeDeploymentSpec判断链码是否安装;获得相关数据结构并返回 ccpackmsg, err = genChaincodeDeploymentSpec(cmd, chaincodeName, chaincodeVersion) if err != nil { return err } } else { //read in a package generated by the "package" sub-command (and perhaps signed //by multiple owners with the "signpackage" sub-command) var cds *pb.ChaincodeDeploymentSpec ccpackmsg, cds, err = getPackageFromFile(ccpackfile) if err != nil { return err } //get the chaincode details from cds cName := cds.ChaincodeSpec.ChaincodeId.Name cVersion := cds.ChaincodeSpec.ChaincodeId.Version //if user provided chaincodeName, use it for validation if chaincodeName != "" && chaincodeName != cName { return fmt.Errorf("chaincode name %s does not match name %s in package", chaincodeName, cName) } //if user provided chaincodeVersion, use it for validation if chaincodeVersion != "" && chaincodeVersion != cVersion { return fmt.Errorf("chaincode version %s does not match version %s in packages", chaincodeVersion, cVersion) } } //真正安装 err = install(ccpackmsg, cf) return err } //install the depspec to "peer.address" func install(msg proto.Message, cf *ChaincodeCmdFactory) error { creator, err := cf.Signer.Serialize() if err != nil { return fmt.Errorf("Error serializing identity for %s: %s", cf.Signer.GetIdentifier(), err) } //创建一个安装提案 prop, _, err := utils.CreateInstallProposalFromCDS(msg, creator) if err != nil { return fmt.Errorf("Error creating proposal %s: %s", chainFuncName, err) } var signedProp *pb.SignedProposal signedProp, err = utils.GetSignedProposal(prop, cf.Signer) if err != nil { return fmt.Errorf("Error creating signed proposal %s: %s", chainFuncName, err) } // install is currently only supported for one peer看看这到哪儿了,是不是非常熟悉---看下面的PB中实现的这个函数 proposalResponse, err := cf.EndorserClients[0].ProcessProposal(context.Background(), signedProp) if err != nil { return fmt.Errorf("Error endorsing %s: %s", chainFuncName, err) } if proposalResponse != nil { if proposalResponse.Response.Status != int32(pcommon.Status_SUCCESS) { return errors.Errorf("Bad response: %d - %s", proposalResponse.Response.Status, proposalResponse.Response.Message) } logger.Infof("Installed remotely %v", proposalResponse) } else { return errors.New("Error during install: received nil proposal response") } return nil } //看一下INSTALL的数据结构 // Carries the chaincode function and its arguments. // UnmarshalJSON in transaction.go converts the string-based REST/JSON input to // the []byte-based current ChaincodeInput structure. type ChaincodeInput struct { Args [][]byte `protobuf:"bytes,1,rep,name=args,proto3" json:"args,omitempty"` Decorations map[string][]byte `protobuf:"bytes,2,rep,name=decorations,proto3" json:"decorations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } // createProposalFromCDS returns a deploy or upgrade proposal given a // serialized identity and a ChaincodeDeploymentSpec func createProposalFromCDS(chainID string, msg proto.Message, creator []byte, propType string, args ...[]byte) (*peer.Proposal, string, error) { // in the new mode, cds will be nil, "deploy" and "upgrade" are instantiates. var ccinp *peer.ChaincodeInput var b []byte var err error if msg != nil { b, err = proto.Marshal(msg) if err != nil { return nil, "", err } } switch propType { case "deploy": fallthrough case "upgrade": cds, ok := msg.(*peer.ChaincodeDeploymentSpec) if !ok || cds == nil { return nil, "", errors.New("invalid message for creating lifecycle chaincode proposal") } Args := [][]byte{[]byte(propType), []byte(chainID), b} Args = append(Args, args...) ccinp = &peer.ChaincodeInput{Args: Args} case "install": ccinp = &peer.ChaincodeInput{Args: [][]byte{[]byte(propType), b}} } //创建一个lscc来提供对链码安装的功能 // wrap the deployment in an invocation spec to lscc... lsccSpec := &peer.ChaincodeInvocationSpec{ ChaincodeSpec: &peer.ChaincodeSpec{ Type: peer.ChaincodeSpec_GOLANG, ChaincodeId: &peer.ChaincodeID{Name: "lscc"}, Input: ccinp, }, } // ...and get the proposal for it return CreateProposalFromCIS(common.HeaderType_ENDORSER_TRANSACTION, chainID, lsccSpec, creator) } // CreateChaincodeProposalWithTxIDNonceAndTransient creates a proposal from // given input func CreateChaincodeProposalWithTxIDNonceAndTransient(txid string, typ common.HeaderType, chainID string, cis *peer.ChaincodeInvocationSpec, nonce, creator []byte, transientMap map[string][]byte) (*peer.Proposal, string, error) { ccHdrExt := &peer.ChaincodeHeaderExtension{ChaincodeId: cis.ChaincodeSpec.ChaincodeId} ccHdrExtBytes, err := proto.Marshal(ccHdrExt) if err != nil { return nil, "", errors.Wrap(err, "error marshaling ChaincodeHeaderExtension") } cisBytes, err := proto.Marshal(cis) if err != nil { return nil, "", errors.Wrap(err, "error marshaling ChaincodeInvocationSpec") } ccPropPayload := &peer.ChaincodeProposalPayload{Input: cisBytes, TransientMap: transientMap} ccPropPayloadBytes, err := proto.Marshal(ccPropPayload) if err != nil { return nil, "", errors.Wrap(err, "error marshaling ChaincodeProposalPayload") } // TODO: epoch is now set to zero. This must be changed once we // get a more appropriate mechanism to handle it in. var epoch uint64 timestamp := util.CreateUtcTimestamp() hdr := &common.Header{ ChannelHeader: MarshalOrPanic( &common.ChannelHeader{ Type: int32(typ), TxId: txid, Timestamp: timestamp, ChannelId: chainID, Extension: ccHdrExtBytes, Epoch: epoch, }, ), SignatureHeader: MarshalOrPanic( &common.SignatureHeader{ Nonce: nonce, Creator: creator, }, ), } hdrBytes, err := proto.Marshal(hdr) if err != nil { return nil, "", err } prop := &peer.Proposal{ Header: hdrBytes, Payload: ccPropPayloadBytes, } return prop, txid, nil }
再看一下承接Proposal的函数:
func (c *endorserClient) ProcessProposal(ctx context.Context, in *SignedProposal, opts ...grpc.CallOption) (*ProposalResponse, error) {
out := new(ProposalResponse)
err := c.cc.Invoke(ctx, "/protos.Endorser/ProcessProposal", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
这里的Invoke会产生一系列的Invoke然后发送消息到背书结点,背书结点还会有ProcessProposal来处理相关的消息,然后最终调用lscc.go中的Invoke:
func (lscc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response { args := stub.GetArgs() if len(args) < 1 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } function := string(args[0]) // Handle ACL: // 1. get the signed proposal sp, err := stub.GetSignedProposal() if err != nil { return shim.Error(fmt.Sprintf("Failed retrieving signed proposal on executing %s with error %s", function, err)) } switch function { case INSTALL: if len(args) < 2 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } // 2. check local MSP Admins policy if err = lscc.PolicyChecker.CheckPolicyNoChannel(mgmt.Admins, sp); err != nil { return shim.Error(fmt.Sprintf("access denied for [%s]: %s", function, err)) } depSpec := args[1] err := lscc.executeInstall(stub, depSpec) if err != nil { return shim.Error(err.Error()) } return shim.Success([]byte("OK")) case DEPLOY, UPGRADE: // we expect a minimum of 3 arguments, the function // name, the chain name and deployment spec if len(args) < 3 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } // channel the chaincode should be associated with. It // should be created with a register call channel := string(args[1]) if !lscc.isValidChannelName(channel) { return shim.Error(InvalidChannelNameErr(channel).Error()) } ac, exists := lscc.SCCProvider.GetApplicationConfig(channel) if !exists { logger.Panicf("programming error, non-existent appplication config for channel '%s'", channel) } // the maximum number of arguments depends on the capability of the channel if !ac.Capabilities().PrivateChannelData() && len(args) > 6 { return shim.Error(PrivateChannelDataNotAvailable("").Error()) } if ac.Capabilities().PrivateChannelData() && len(args) > 7 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } depSpec := args[2] cds := &pb.ChaincodeDeploymentSpec{} err := proto.Unmarshal(depSpec, cds) if err != nil { return shim.Error(fmt.Sprintf("error unmarshaling ChaincodeDeploymentSpec: %s", err)) } // optional arguments here (they can each be nil and may or may not be present) // args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy // args[4] is the name of escc // args[5] is the name of vscc // args[6] is a marshalled CollectionConfigPackage struct var EP []byte if len(args) > 3 && len(args[3]) > 0 { EP = args[3] } else { p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel)) EP, err = utils.Marshal(p) if err != nil { return shim.Error(err.Error()) } } var escc []byte if len(args) > 4 && len(args[4]) > 0 { escc = args[4] } else { escc = []byte("escc") } var vscc []byte if len(args) > 5 && len(args[5]) > 0 { vscc = args[5] } else { vscc = []byte("vscc") } var collectionsConfig []byte // we proceed with a non-nil collection configuration only if // we Support the PrivateChannelData capability if ac.Capabilities().PrivateChannelData() && len(args) > 6 { collectionsConfig = args[6] } cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function) if err != nil { return shim.Error(err.Error()) } cdbytes, err := proto.Marshal(cd) if err != nil { return shim.Error(err.Error()) } return shim.Success(cdbytes) case CCEXISTS, CHAINCODEEXISTS, GETDEPSPEC, GETDEPLOYMENTSPEC, GETCCDATA, GETCHAINCODEDATA: if len(args) != 3 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } channel := string(args[1]) ccname := string(args[2]) // 2. check policy for ACL resource var resource string switch function { case CCEXISTS, CHAINCODEEXISTS: resource = resources.Lscc_ChaincodeExists case GETDEPSPEC, GETDEPLOYMENTSPEC: resource = resources.Lscc_GetDeploymentSpec case GETCCDATA, GETCHAINCODEDATA: resource = resources.Lscc_GetChaincodeData } if err = lscc.ACLProvider.CheckACL(resource, channel, sp); err != nil { return shim.Error(fmt.Sprintf("access denied for [%s][%s]: %s", function, channel, err)) } cdbytes, err := lscc.getCCInstance(stub, ccname) if err != nil { logger.Errorf("error getting chaincode %s on channel [%s]: %s", ccname, channel, err) return shim.Error(err.Error()) } switch function { case CCEXISTS, CHAINCODEEXISTS: cd, err := lscc.getChaincodeData(ccname, cdbytes) if err != nil { return shim.Error(err.Error()) } return shim.Success([]byte(cd.Name)) case GETCCDATA, GETCHAINCODEDATA: return shim.Success(cdbytes) case GETDEPSPEC, GETDEPLOYMENTSPEC: _, depspecbytes, err := lscc.getCCCode(ccname, cdbytes) if err != nil { return shim.Error(err.Error()) } return shim.Success(depspecbytes) default: panic("unreachable") } case GETCHAINCODES, GETCHAINCODESALIAS: if len(args) != 1 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } if err = lscc.ACLProvider.CheckACL(resources.Lscc_GetInstantiatedChaincodes, stub.GetChannelID(), sp); err != nil { return shim.Error(fmt.Sprintf("access denied for [%s][%s]: %s", function, stub.GetChannelID(), err)) } return lscc.getChaincodes(stub) case GETINSTALLEDCHAINCODES, GETINSTALLEDCHAINCODESALIAS: if len(args) != 1 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } // 2. check local MSP Admins policy if err = lscc.PolicyChecker.CheckPolicyNoChannel(mgmt.Admins, sp); err != nil { return shim.Error(fmt.Sprintf("access denied for [%s]: %s", function, err)) } return lscc.getInstalledChaincodes() case GETCOLLECTIONSCONFIG, GETCOLLECTIONSCONFIGALIAS: if len(args) != 2 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } chaincodeName := string(args[1]) logger.Debugf("GetCollectionsConfig, chaincodeName:%s, start to check ACL for current identity policy", chaincodeName) if err = lscc.ACLProvider.CheckACL(resources.Lscc_GetCollectionsConfig, stub.GetChannelID(), sp); err != nil { logger.Debugf("ACL Check Failed for channel:%s, chaincode:%s", stub.GetChannelID(), chaincodeName) return shim.Error(fmt.Sprintf("access denied for [%s]: %s", function, err)) } return lscc.getChaincodeCollectionData(stub, chaincodeName) } return shim.Error(InvalidFunctionErr(function).Error()) } // executeInstall implements the "install" Invoke transaction func (lscc *LifeCycleSysCC) executeInstall(stub shim.ChaincodeStubInterface, ccbytes []byte) error { ccpack, err := ccprovider.GetCCPackage(ccbytes) if err != nil { return err } cds := ccpack.GetDepSpec() if cds == nil { return fmt.Errorf("nil deployment spec from from the CC package") } if err = lscc.isValidChaincodeName(cds.ChaincodeSpec.ChaincodeId.Name); err != nil { return err } if err = lscc.isValidChaincodeVersion(cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version); err != nil { return err } if lscc.SCCProvider.IsSysCC(cds.ChaincodeSpec.ChaincodeId.Name) { return errors.Errorf("cannot install: %s is the name of a system chaincode", cds.ChaincodeSpec.ChaincodeId.Name) } // Get any statedb artifacts from the chaincode package, e.g. couchdb index definitions statedbArtifactsTar, err := ccprovider.ExtractStatedbArtifactsFromCCPackage(ccpack, lscc.PlatformRegistry) if err != nil { return err } if err = isValidStatedbArtifactsTar(statedbArtifactsTar); err != nil { return InvalidStatedbArtifactsErr(err.Error()) } chaincodeDefinition := &cceventmgmt.ChaincodeDefinition{ Name: ccpack.GetChaincodeData().Name, Version: ccpack.GetChaincodeData().Version, Hash: ccpack.GetId()} // Note - The chaincode 'id' is the hash of chaincode's (CodeHash || MetaDataHash), aka fingerprint // HandleChaincodeInstall will apply any statedb artifacts (e.g. couchdb indexes) to // any channel's statedb where the chaincode is already instantiated // Note - this step is done prior to PutChaincodeToLocalStorage() since this step is idempotent and harmless until endorsements start, // that is, if there are errors deploying the indexes the chaincode install can safely be re-attempted later. err = cceventmgmt.GetMgr().HandleChaincodeInstall(chaincodeDefinition, statedbArtifactsTar) defer func() { cceventmgmt.GetMgr().ChaincodeInstallDone(err == nil) }() if err != nil { return err } // Finally, if everything is good above, install the chaincode to local peer file system so that endorsements can start if err = lscc.Support.PutChaincodeToLocalStorage(ccpack); err != nil { return err } logger.Infof("Installed Chaincode [%s] Version [%s] to peer", ccpack.GetChaincodeData().Name, ccpack.GetChaincodeData().Version) return nil }
其它几个与之类似,后面如果用到或者再专门详述。
链码的真正作用在于运行起来,和外界进行交互,下面看一个普通的链码运行的过程代码,链码的运行分为Peer侧和容器侧,也就是说,一部分在Peer结点进行处理发送消息和Docker进行通信,另外一部分在Docker一侧接收消息并做执行的动作,同时将数据结果返回到Peer侧。先看一下Peer侧:
//首先从命令Chaincode中看一下Invoke调用的命令 func chaincodeInvokeOrQuery(cmd *cobra.Command, invoke bool, cf *ChaincodeCmdFactory) (err error) { spec, err := getChaincodeSpec(cmd) if err != nil { return err } // call with empty txid to ensure production code generates a txid. // otherwise, tests can explicitly set their own txid txID := "" //看这里 proposalResp, err := ChaincodeInvokeOrQuery( spec, channelID, txID, invoke, cf.Signer, cf.Certificate, cf.EndorserClients, cf.DeliverClients, cf.BroadcastClient) if err != nil { return errors.Errorf("%s - proposal response: %v", err, proposalResp) } if invoke { logger.Debugf("ESCC invoke result: %v", proposalResp) pRespPayload, err := putils.GetProposalResponsePayload(proposalResp.Payload) ...... } return nil } func ChaincodeInvokeOrQuery( spec *pb.ChaincodeSpec, cID string, txID string, invoke bool, signer msp.SigningIdentity, certificate tls.Certificate, endorserClients []pb.EndorserClient, deliverClients []api.PeerDeliverClient, bc common.BroadcastClient, ) (*pb.ProposalResponse, error) { // Build the ChaincodeInvocationSpec message创建链码规范的对象,包括使用语言和ChaincodeID等 invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec} creator, err := signer.Serialize() if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error serializing identity for %s", signer.GetIdentifier())) } funcName := "invoke" if !invoke { funcName = "query" } // extract the transient field if it exists var tMap map[string][]byte if transient != "" { if err := json.Unmarshal([]byte(transient), &tMap); err != nil { return nil, errors.Wrap(err, "error parsing transient string") } } //创建交易 prop, txid, err := putils.CreateChaincodeProposalWithTxIDAndTransient(pcommon.HeaderType_ENDORSER_TRANSACTION, cID, invocation, creator, txID, tMap) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error creating proposal for %s", funcName)) } signedProp, err := putils.GetSignedProposal(prop, signer) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error creating signed proposal for %s", funcName)) } var responses []*pb.ProposalResponse for _, endorser := range endorserClients { //在这又看到了老朋友 proposalResp, err := endorser.ProcessProposal(context.Background(), signedProp) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error endorsing %s", funcName)) } responses = append(responses, proposalResp) } if len(responses) == 0 { // this should only happen if some new code has introduced a bug return nil, errors.New("no proposal responses received - this might indicate a bug") } // all responses will be checked when the signed transaction is created. // for now, just set this so we check the first response's status proposalResp := responses[0] if invoke { if proposalResp != nil { if proposalResp.Response.Status >= shim.ERRORTHRESHOLD { return proposalResp, nil } // assemble a signed transaction (it's an Envelope message) env, err := putils.CreateSignedTx(prop, signer, responses...) if err != nil { return proposalResp, errors.WithMessage(err, "could not assemble transaction") } var dg *deliverGroup var ctx context.Context if waitForEvent { var cancelFunc context.CancelFunc ctx, cancelFunc = context.WithTimeout(context.Background(), waitForEventTimeout) defer cancelFunc() dg = newDeliverGroup(deliverClients, peerAddresses, certificate, channelID, txid) // connect to deliver service on all peers err := dg.Connect(ctx) if err != nil { return nil, err } } // send the envelope for ordering if err = bc.Send(env); err != nil { return proposalResp, errors.WithMessage(err, fmt.Sprintf("error sending transaction for %s", funcName)) } if dg != nil && ctx != nil { // wait for event that contains the txid from all peers err = dg.Wait(ctx) if err != nil { return nil, err } } } } return proposalResp, nil } func (c *endorserClient) ProcessProposal(ctx context.Context, in *SignedProposal, opts ...grpc.CallOption) (*ProposalResponse, error) { out := new(ProposalResponse) err := c.cc.Invoke(ctx, "/protos.Endorser/ProcessProposal", in, out, opts...) if err != nil { return nil, err } return out, nil } func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...) } func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
到这儿其实是前面的一些代码的再次回顾,挺简单的,最终Stream流处理后,又开始了gRPC的服务发消息。在前面的讲述Peer初始化和启动时,知道了背书服务的启动和注册,其中服务最终注册到GRPC服务上,而Endorser这个类对象实现了EndoserServer的ProcessProposal:
type EndorserServer interface { ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error) } // ProcessProposal process the Proposal func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) { // start time for computing elapsed time metric for successfully endorsed proposals startTime := time.Now() e.Metrics.ProposalsReceived.Add(1) addr := util.ExtractRemoteAddress(ctx) endorserLogger.Debug("Entering: request from", addr) // variables to capture proposal duration metric var chainID string var hdrExt *pb.ChaincodeHeaderExtension var success bool defer func() { // capture proposal duration metric. hdrExt == nil indicates early failure // where we don't capture latency metric. But the ProposalValidationFailed // counter metric should shed light on those failures. if hdrExt != nil { meterLabels := []string{ "channel", chainID, "chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version, "success", strconv.FormatBool(success), } e.Metrics.ProposalDuration.With(meterLabels...).Observe(time.Since(startTime).Seconds()) } endorserLogger.Debug("Exit: request from", addr) }() // 0 -- check and validate vr, err := e.preProcess(signedProp) if err != nil { resp := vr.resp return resp, err } prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid // obtaining once the tx simulator for this proposal. This will be nil // for chainless proposals // Also obtain a history query executor for history queries, since tx simulator does not cover history var txsim ledger.TxSimulator var historyQueryExecutor ledger.HistoryQueryExecutor if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) { if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } // txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit // of valid write-sets to the stateDB), we must release the lock as early as possible. // Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and // rwset is collected before gossip dissemination if required for privateData. For safety, we // add the following defer statement and is useful when an error occur. Note that calling // txsim.Done() more than once does not cause any issue. If the txsim is already // released, the following txsim.Done() simply returns. defer txsim.Done() if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } } txParams := &ccprovider.TransactionParams{ ChannelID: chainID, TxID: txid, SignedProp: signedProp, Proposal: prop, TXSimulator: txsim, HistoryQueryExecutor: historyQueryExecutor, } // this could be a request to a chainless SysCC // TODO: if the proposal has an extension, it will be of type ChaincodeAction; // if it's present it means that no simulation is to be performed because // we're trying to emulate a submitting peer. On the other hand, we need // to validate the supplied action before endorsing it // 1 -- simulate cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } if res != nil { if res.Status >= shim.ERROR { endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid) var cceventBytes []byte if ccevent != nil { cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent) if err != nil { return nil, errors.Wrap(err, "failed to marshal event bytes") } } pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } return pResp, nil } } // 2 -- endorse and get a marshalled ProposalResponse message var pResp *pb.ProposalResponse // TODO till we implement global ESCC, CSCC for system chaincodes // chainless proposals (such as CSCC) don't have to be endorsed if chainID == "" { pResp = &pb.ProposalResponse{Response: res} } else { // Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd) // if error, capture endorsement failure metric meterLabels := []string{ "channel", chainID, "chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version, } if err != nil { meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(false)) e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1) return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } if pResp.Response.Status >= shim.ERRORTHRESHOLD { // the default ESCC treats all status codes about threshold as errors and fails endorsement // useful to track this as a separate metric meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(true)) e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1) endorserLogger.Debugf("[%s][%s] endorseProposal() resulted in chaincode %s error for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, txid) return pResp, nil } } // Set the proposal response payload - it // contains the "return value" from the // chaincode invocation pResp.Response = res // total failed proposals = ProposalsReceived-SuccessfulProposals e.Metrics.SuccessfulProposals.Add(1) success = true return pResp, nil }
其实上面的代码主要是两个函数的调用,一个是模拟执行SimulateProposal,它会调用执行callChaincode。然后调用:
func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) { ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec) ccci.Version = cccid.Version err := cs.LaunchInit(ccci) if err != nil { return nil, nil, err } cname := ccci.Name + ":" + ccci.Version h := cs.HandlerRegistry.Handler(cname) if h == nil { return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname) } resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h) return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err) } func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) { chaincodeLogger.Debugf("Entry") defer chaincodeLogger.Debugf("Exit") txParams.CollectionStore = h.getCollectionStore(msg.ChannelId) txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT) txctx, err := h.TXContexts.Create(txParams) if err != nil { return nil, err } defer h.TXContexts.Delete(msg.ChannelId, msg.Txid) if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil { return nil, err } h.serialSendAsync(msg) var ccresp *pb.ChaincodeMessage select { case ccresp = <-txctx.ResponseNotifier: // response is sent to user or calling chaincode. ChaincodeMessage_ERROR // are typically treated as error case <-time.After(timeout): err = errors.New("timeout expired while executing transaction") ccName := cccid.Name + ":" + cccid.Version h.Metrics.ExecuteTimeouts.With("chaincode", ccName).Add(1) case <-h.streamDone(): err = errors.New("chaincode stream terminated") } return ccresp, err } // serialSend serializes msgs so gRPC will be happy func (h *Handler) serialSend(msg *pb.ChaincodeMessage) error { h.serialLock.Lock() defer h.serialLock.Unlock() //到这里基本就调到了底层的Stream的流式数据发送 if err := h.chatStream.Send(msg); err != nil { err = errors.WithMessage(err, fmt.Sprintf("[%s] error sending %s", shorttxid(msg.Txid), msg.Type)) chaincodeLogger.Errorf("%+v", err) return err } return nil }
serialSend发送给容器侧的处理,其后的签名等前面分析过,不再赘述,再看一下容器侧:
先看一个简单的链码:
package main // 引入必要的包 import( "fmt" "github.com/hyperledger/fabric/core/chaincode/shim" pb "github.com/hyperledger/fabric/protos/peer" ) // 声明一个结构体 type FirstChaincode struct { } //Init方法 func (f *FirstChaincode) Init(stub shim.ChaincodeStubInterface) pb.Response{ //实现初始化和升级的代码 } //Invoke方法 func (f *FirstChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response{ // 调用代码实现 } // 主函数,重点关注shim.Start方法 func main() { err := shim.Start(new(FirstChaincode)) if err != nil { fmt.Printf("start chaincode err: %s", err) } }
其实重点在Start函数中,分析一下启动的流程(fabric/core/chaincode/shim/chaincode.go):
//fabric/core/chaincode/shim/chaincode.go // chaincodes. func Start(cc Chaincode) error { // If Start() is called, we assume this is a standalone chaincode and set // up formatted logging.设置日志 SetupChaincodeLogging() //得到链码名称 chaincodename := viper.GetString("chaincode.id.name") if chaincodename == "" { return errors.New("error chaincode id not provided") } //设置基础的加密组件 err := factory.InitFactories(factory.GetDefaultOpts()) if err != nil { return errors.WithMessage(err, "internal error, BCCSP could not be initialized with default options") } //mock stream not set up ... get real stream if streamGetter == nil { streamGetter = userChaincodeStreamGetter } //获得链码的数据流-建立相关的服务通信client stream, err := streamGetter(chaincodename) if err != nil { return err } //和容器通信 err = chatWithPeer(chaincodename, stream, cc) return err } //the non-mock user CC stream establishment func func userChaincodeStreamGetter(name string) (PeerChaincodeStream, error) { flag.StringVar(&peerAddress, "peer.address", "", "peer address") if viper.GetBool("peer.tls.enabled") { //相关的密钥地址,在用户安装时指定 keyPath := viper.GetString("tls.client.key.path") certPath := viper.GetString("tls.client.cert.path") //读取数据 data, err1 := ioutil.ReadFile(keyPath) if err1 != nil { err1 = errors.Wrap(err1, fmt.Sprintf("error trying to read file content %s", keyPath)) chaincodeLogger.Errorf("%+v", err1) return nil, err1 } key = string(data) data, err1 = ioutil.ReadFile(certPath) if err1 != nil { err1 = errors.Wrap(err1, fmt.Sprintf("error trying to read file content %s", certPath)) chaincodeLogger.Errorf("%+v", err1) return nil, err1 } cert = string(data) } flag.Parse() chaincodeLogger.Debugf("Peer address: %s", getPeerAddress()) // Establish connection with validating peer //建立与PEER节点的连接 clientConn, err := newPeerClientConnection() if err != nil { err = errors.Wrap(err, "error trying to connect to local peer") chaincodeLogger.Errorf("%+v", err) return nil, err } chaincodeLogger.Debugf("os.Args returns: %s", os.Args) //返回一个和容器通信的Client chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn) // Establish stream with validating peer stream, err := chaincodeSupportClient.Register(context.Background()) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error chatting with leader at address=%s", getPeerAddress())) } return stream, nil } func newPeerClientConnection() (*grpc.ClientConn, error) { var peerAddress = getPeerAddress() // set the keepalive options to match static settings for chaincode server kaOpts := &comm.KeepaliveOptions{ ClientInterval: time.Duration(1) * time.Minute, ClientTimeout: time.Duration(20) * time.Second, } if viper.GetBool("peer.tls.enabled") { return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForShim(key, cert), kaOpts) } //在这个函数里直接拨号连网GRPC return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil, kaOpts) } //chaincodeSupportClient.Register会调用下面的这个函数 // will not call the optionally-configured stats handler with a stats.End message. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) if cc.dopts.streamInt != nil { return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) } return newClientStream(ctx, desc, cc, method, opts...) } //开始通信 func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error { // Create the shim handler responsible for all control logic handler := newChaincodeHandler(stream, cc) //关闭网络gRPC的连接 defer stream.CloseSend() // Send the ChaincodeID during register.通过名称获得链码ID chaincodeID := &pb.ChaincodeID{Name: chaincodename} //解析链码的有效载荷 payload, err := proto.Marshal(chaincodeID) if err != nil { return errors.Wrap(err, "error marshalling chaincodeID during chaincode registration") } // Register on the stream通过GRPC向节点发送REGISTER消息 chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER) if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil { return errors.WithMessage(err, "error sending chaincode REGISTER") } // holds return values from gRPC Recv below type recvMsg struct { msg *pb.ChaincodeMessage err error } msgAvail := make(chan *recvMsg, 1) errc := make(chan error) receiveMessage := func() { in, err := stream.Recv() msgAvail <- &recvMsg{in, err} } //接收由Peer返回的消息 go receiveMessage() for { select { case rmsg := <-msgAvail: switch { case rmsg.err == io.EOF: err = errors.Wrapf(rmsg.err, "received EOF, ending chaincode stream") chaincodeLogger.Debugf("%+v", err) return err case rmsg.err != nil: err := errors.Wrap(rmsg.err, "receive failed") chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err) return err case rmsg.msg == nil: err := errors.New("received nil message, ending chaincode stream") chaincodeLogger.Debugf("%+v", err) return err default: chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(rmsg.msg.Txid), rmsg.msg.Type) //这个是对接收到的消息进行处理的函数 err := handler.handleMessage(rmsg.msg, errc) if err != nil { err = errors.WithMessage(err, "error handling message") return err } //接收完成后再次准备接收消息 go receiveMessage() } case sendErr := <-errc: if sendErr != nil { err := errors.Wrap(sendErr, "error sending") return err } } } } // NewChaincodeHandler returns a new instance of the shim side handler. func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode) *Handler { v := &Handler{ //通信用的流底层实例 ChatStream: peerChatStream, cc: chaincode, } //设置链码的响应通道并设置链码容器的状态为created v.responseChannel = make(map[string]chan pb.ChaincodeMessage) v.state = created return v } // handleMessage message handles loop for shim side of chaincode/peer stream. func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error { //心跳的控制,保证和Peer的连接的正常 if msg.Type == pb.ChaincodeMessage_KEEPALIVE { chaincodeLogger.Debug("Sending KEEPALIVE response") handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work return nil } chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) var err error switch handler.state { case ready: err = handler.handleReady(msg, errc) case established: err = handler.handleEstablished(msg, errc) case created: err = handler.handleCreated(msg, errc) default: err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state) } if err != nil { payload := []byte(err.Error()) errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid} handler.serialSend(errorMsg) return err } return nil }
这时候儿还得回头看看Peer对Register的处理:
//此函数中在ChaincodeSupport中注册使用 // handleMessage is called by ProcessStream to dispatch messages. func (h *Handler) handleMessage(msg *pb.ChaincodeMessage) error { chaincodeLogger.Debugf("[%s] Fabric side handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, h.state) if msg.Type == pb.ChaincodeMessage_KEEPALIVE { return nil } switch h.state { case Created: return h.handleMessageCreatedState(msg) case Ready: return h.handleMessageReadyState(msg) default: return errors.Errorf("handle message: invalid state %s for transaction %s", h.state, msg.Txid) } } func (h *Handler) handleMessageCreatedState(msg *pb.ChaincodeMessage) error { switch msg.Type { //看到了Register case pb.ChaincodeMessage_REGISTER: h.HandleRegister(msg) default: return fmt.Errorf("[%s] Fabric side handler cannot handle message (%s) while in created state", msg.Txid, msg.Type) } return nil } // handleRegister is invoked when chaincode tries to register. func (h *Handler) HandleRegister(msg *pb.ChaincodeMessage) { chaincodeLogger.Debugf("Received %s in state %s", msg.Type, h.state) chaincodeID := &pb.ChaincodeID{} err := proto.Unmarshal(msg.Payload, chaincodeID) if err != nil { chaincodeLogger.Errorf("Error in received %s, could NOT unmarshal registration info: %s", pb.ChaincodeMessage_REGISTER, err) return } // Now register with the chaincodeSupport h.chaincodeID = chaincodeID //注册链码到Peer err = h.Registry.Register(h) if err != nil { h.notifyRegistry(err) return } // get the component parts so we can use the root chaincode // name in keys h.ccInstance = ParseName(h.chaincodeID.Name) chaincodeLogger.Debugf("Got %s for chaincodeID = %s, sending back %s", pb.ChaincodeMessage_REGISTER, chaincodeID, pb.ChaincodeMessage_REGISTERED) //发送返回的注册消息到链码侧 if err := h.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil { chaincodeLogger.Errorf("error sending %s: %s", pb.ChaincodeMessage_REGISTERED, err) h.notifyRegistry(err) return } //更新当前状态 h.state = Established chaincodeLogger.Debugf("Changed state to established for %+v", h.chaincodeID) // for dev mode this will also move to ready automatically h.notifyRegistry(nil) } // notifyRegistry will send ready on registration success and // update the launch state of the chaincode in the handler registry. func (h *Handler) notifyRegistry(err error) { if err == nil { err = h.sendReady() } if err != nil { h.Registry.Failed(h.chaincodeID.Name, err) chaincodeLogger.Errorf("failed to start %s", h.chaincodeID) return } h.Registry.Ready(h.chaincodeID.Name) } // sendReady sends READY to chaincode serially (just like REGISTER) func (h *Handler) sendReady() error { chaincodeLogger.Debugf("sending READY for chaincode %+v", h.chaincodeID) ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY} // if error in sending tear down the h if err := h.serialSend(ccMsg); err != nil { chaincodeLogger.Errorf("error sending READY (%s) for chaincode %+v", err, h.chaincodeID) return err } h.state = Ready chaincodeLogger.Debugf("Changed to state ready for chaincode %+v", h.chaincodeID) return nil }
发送Ready消息并更新状态到Ready。这样,又得回到链码铡或者说容器侧,其实还是回到handleMessage函数中,:
switch handler.state {
case ready:
err = handler.handleReady(msg, errc)
case established:
err = handler.handleEstablished(msg, errc)
case created:
err = handler.handleCreated(msg, errc)
default:
err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
}
其实就是根据Peer发送过来的几种消息来回跳转这三个函数:
//handle ready state func (handler *Handler) handleReady(msg *pb.ChaincodeMessage, errc chan error) error { switch msg.Type { case pb.ChaincodeMessage_RESPONSE: if err := handler.sendChannel(msg); err != nil { chaincodeLogger.Errorf("[%s] error sending %s (state:%s): %s", shorttxid(msg.Txid), msg.Type, handler.state, err) return err } chaincodeLogger.Debugf("[%s] Received %s, communicated (state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) return nil case pb.ChaincodeMessage_ERROR: if err := handler.sendChannel(msg); err != nil { chaincodeLogger.Errorf("[%s] error sending %s (state:%s): %s", shorttxid(msg.Txid), msg.Type, handler.state, err) } chaincodeLogger.Debugf("[%s] Error Received %s, communicated (state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) //we don't return error on ERROR return nil case pb.ChaincodeMessage_INIT: chaincodeLogger.Debugf("[%s] Received %s, initializing chaincode", shorttxid(msg.Txid), msg.Type) // Call the chaincode's Run function to initialize handler.handleInit(msg, errc) return nil case pb.ChaincodeMessage_TRANSACTION: chaincodeLogger.Debugf("[%s] Received %s, invoking transaction on chaincode(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) // Call the chaincode's Run function to invoke transaction handler.handleTransaction(msg, errc) return nil } return errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state) }
其它两个比较简单,只有这个Ready,有点小复杂,它还有一个调用:
// handleInit handles request to initialize chaincode. func (handler *Handler) handleInit(msg *pb.ChaincodeMessage, errc chan error) { // The defer followed by triggering a go routine dance is needed to ensure that the previous state transition // is completed before the next one is triggered. The previous state transition is deemed complete only when // the beforeInit function is exited. Interesting bug fix!! go func() { var nextStateMsg *pb.ChaincodeMessage defer func() { //更新链码当前的状态 handler.triggerNextState(nextStateMsg, errc) }() errFunc := func(err error, payload []byte, ce *pb.ChaincodeEvent, errFmt string, args ...interface{}) *pb.ChaincodeMessage { if err != nil { // Send ERROR message to chaincode support and change state if payload == nil { payload = []byte(err.Error()) } chaincodeLogger.Errorf(errFmt, args...) return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid, ChaincodeEvent: ce, ChannelId: msg.ChannelId} } return nil } // Get the function and args from Payload获得有效载荷并反序列化 input := &pb.ChaincodeInput{} unmarshalErr := proto.Unmarshal(msg.Payload, input) if nextStateMsg = errFunc(unmarshalErr, nil, nil, "[%s] Incorrect payload format. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil { return } // Call chaincode's Run // Create the ChaincodeStub which the chaincode can use to callback stub := new(ChaincodeStub) //从发送的提案中获得数据赋值到ChaincodeStub err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal) if nextStateMsg = errFunc(err, nil, stub.chaincodeEvent, "[%s] Init get error response. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil { return } //调用链码本身的init函数 res := handler.cc.Init(stub) chaincodeLogger.Debugf("[%s] Init get response status: %d", shorttxid(msg.Txid), res.Status) if res.Status >= ERROR { err = errors.New(res.Message) if nextStateMsg = errFunc(err, []byte(res.Message), stub.chaincodeEvent, "[%s] Init get error response. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil { return } } resBytes, err := proto.Marshal(&res) if err != nil { payload := []byte(err.Error()) chaincodeLogger.Errorf("[%s] Init marshal response error [%s]. Sending %s", shorttxid(msg.Txid), err, pb.ChaincodeMessage_ERROR) nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent} return } // Send COMPLETED message to chaincode support and change state调用Defer中的函数发送此消息到Peer nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId} chaincodeLogger.Debugf("[%s] Init succeeded. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED) }() }
这样的话,一个链码就成功的实例化,然后可以发送交易执行上面的Case中的pb.ChaincodeMessage_TRANSACTION,调用下面的函数:
// handleTransaction Handles request to execute a transaction. func (handler *Handler) handleTransaction(msg *pb.ChaincodeMessage, errc chan error) { // The defer followed by triggering a go routine dance is needed to ensure that the previous state transition // is completed before the next one is triggered. The previous state transition is deemed complete only when // the beforeInit function is exited. Interesting bug fix!! go func() { //better not be nil var nextStateMsg *pb.ChaincodeMessage defer func() { handler.triggerNextState(nextStateMsg, errc) }() errFunc := func(err error, ce *pb.ChaincodeEvent, errStr string, args ...interface{}) *pb.ChaincodeMessage { if err != nil { payload := []byte(err.Error()) chaincodeLogger.Errorf(errStr, args...) return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid, ChaincodeEvent: ce, ChannelId: msg.ChannelId} } return nil } // Get the function and args from Payload input := &pb.ChaincodeInput{} unmarshalErr := proto.Unmarshal(msg.Payload, input) if nextStateMsg = errFunc(unmarshalErr, nil, "[%s] Incorrect payload format. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil { return } // Call chaincode's Run // Create the ChaincodeStub which the chaincode can use to callback stub := new(ChaincodeStub) err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal) if nextStateMsg = errFunc(err, stub.chaincodeEvent, "[%s] Transaction execution failed. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil { return } //此处调用相关的系统链码或函数来操作具体的数据 res := handler.cc.Invoke(stub) // Endorser will handle error contained in Response. resBytes, err := proto.Marshal(&res) if nextStateMsg = errFunc(err, stub.chaincodeEvent, "[%s] Transaction execution failed. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil { return } // Send COMPLETED message to chaincode support and change state chaincodeLogger.Debugf("[%s] Transaction completed. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED) nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId} }() }
上述操作完成后,再将数据发送回Peer,又是类似的循环即可。其实就是两个handleMessage在互相的处理想到的数据。
链码分为系统和用户两大类,系统的链码安装是在启动时就开始了,并完成了装载和初始化并启动。而用户链码则需要提供相应的命令来在Peer侧进行处理。然后将相关的消息打包成交易发送到服务端,然后再调用容器服务启动实例化,并提供相关的接口调用。只有明白了链码运行的流程,才能从深层次上掌握链码的实质并加以运用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。