package mainimport ("context""fmt")func main() {// gen generates integers in a separate goroutine and// sends them to the returned channel.// The callers of gen need to cancel the context once// they are done consuming generated integers not to leak// the internal goroutine started by gen.gen := func(ctx context.Context) <-chan int {dst := make(chan int)n := 1go func() {for {select {case <-ctx.Done():return // returning not to leak the goroutinecase dst <- n:n++}}}()return dst}ctx, cancel := context.WithCancel(context.Background())defer cancel() // cancel when we are finished consuming integersfor n := range gen(ctx) {fmt.Println(n)if n == 5 {break}}}关于官网文档中的WithDeadline演示的代码例子:
package mainimport ("context""fmt""time")func main() {d := time.Now().Add(50 * time.Millisecond)ctx, cancel := context.WithDeadline(context.Background(), d)// Even though ctx will be expired, it is good practice to call its// cancelation function in any case. Failure to do so may keep the// context and its parent alive longer than necessary.defer cancel()select {case <-time.After(1 * time.Second):fmt.Println("overslept")case <-ctx.Done():fmt.Println(ctx.Err())}}通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:
package mainimport ("github.com/coreos/etcd/clientv3""time""fmt""context")func main() {cli,err := clientv3.New(clientv3.Config{Endpoints:[]string{"192.168.0.118:2371"},DialTimeout:5*time.Second,})if err != nil {fmt.Println("connect failed,err:",err)return}defer cli.Close()// 这里会阻塞rch := cli.Watch(context.Background(),"logagent/conf/")for wresp := range rch{for _,ev := range wresp.Events{fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)}}}实现一个kafka的消费者代码的简单例子:
package mainimport ("github.com/Shopify/sarama""strings""fmt""time")func main() {consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)if err != nil{fmt.Println("failed to start consumer:",err)return}partitionList,err := consumer.Partitions("Nginx_log")if err != nil {fmt.Println("Failed to get the list of partitions:",err)return}fmt.Println(partitionList)for partition := range partitionList{pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)if err != nil {fmt.Printf("failed to start consumer for partition %d:%sn",partition,err)return}defer pc.AsyncClose()go func(partitionConsumer sarama.PartitionConsumer){for msg := range pc.Messages(){fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))}}(pc)}time.Sleep(time.Hour)consumer.Close()}但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现
package mainimport ("github.com/Shopify/sarama""strings""fmt""sync")var (wg sync.WaitGroup)func main() {consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)if err != nil{fmt.Println("failed to start consumer:",err)return}partitionList,err := consumer.Partitions("nginx_log")if err != nil {fmt.Println("Failed to get the list of partitions:",err)return}fmt.Println(partitionList)for partition := range partitionList{pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)if err != nil {fmt.Printf("failed to start consumer for partition %d:%sn",partition,err)return}defer pc.AsyncClose()go func(partitionConsumer sarama.PartitionConsumer){wg.Add(1)for msg := range partitionConsumer.Messages(){fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))}wg.Done()}(pc)}//time.Sleep(time.Hour)wg.Wait()consumer.Close()}将客户端需要收集的日志信息放到etcd中关于etcd处理的代码为:
package mainimport ("github.com/coreos/etcd/clientv3""time""github.com/astaxie/beego/logs""context""fmt")var Client *clientv3.Clientvar logConfChan chan string// 初始化etcdfunc initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){var keys []stringfor _,ip := range ipArrays{//keyfmt = /logagent/%s/log_configkeys = Append(keys,fmt.Sprintf(keyfmt,ip))}logConfChan = make(chan string,10)logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)Client,err = clientv3.New(clientv3.Config{Endpoints:addr,DialTimeout: timeout,})if err != nil{logs.Error("connect failed,err:%v",err)return}logs.Debug("init etcd success")waitGroup.Add(1)for _, key := range keys{ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)// 从etcd中获取要收集日志的信息resp,err := Client.Get(ctx,key)cancel()if err != nil {logs.Warn("get key %s failed,err:%v",key,err)continue}for _, ev := range resp.Kvs{logs.Debug("%q : %qn",ev.Key, ev.Value)logConfChan <- string(ev.Value)}}go WatchEtcd(keys)return}func WatchEtcd(keys []string){// 这里用于检测当需要收集的日志信息更改时及时更新var watchChans []clientv3.WatchChanfor _,key := range keys{rch := Client.Watch(context.Background(),key)watchChans = append(watchChans,rch)}for {for _,watchC := range watchChans{select{case wresp := <-watchC:for _,ev:= range wresp.Events{logs.Debug("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)logConfChan <- string(ev.Kv.Value)}default:}}time.Sleep(time.Second)}waitGroup.Done()}func GetLogConf()chan string{return logConfChan}
推荐阅读
- 数据库同步软件DBSync的设计与实现
- 阿里大牛纯CSS实现了常见的UI效果,不信你看
- shell脚本判断奇偶数?shell脚本是什么语言
- 这才是你需要的C语言、C++学习路线
- 三国诸葛亮的思维导图?三国诸葛亮的经典语言
- 华为路由器静态NAT配置,实现一对一地址映射,土豪级,收藏转
- 用Spring Boot实现https ssl免密登录
- go语言操作PostgreSQL数据库
- MySQL还能实现分布式锁?
- Spring Boot实现阿里云SMS短信发送功能
