Search Options

Results per page
Sort
Preferred Languages
Advance

Results 1 - 10 of 13 for StreamItems (0.17 sec)

  1. internal/store/store.go

    			if !ok {
    				return
    			}
    
    			if !send(key) {
    				return
    			}
    		case <-doneCh:
    			return
    		}
    	}
    }
    
    // StreamItems reads the keys from the store and replays the corresponding item to the target.
    func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger) {
    	go func() {
    		keyCh := replayItems(store, doneCh, logger, target.Name())
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 4.2K bytes
    - Viewed (0)
  2. internal/event/target/nsq.go

    		id:         event.TargetID{ID: id, Name: "nsq"},
    		args:       args,
    		loggerOnce: loggerOnce,
    		store:      queueStore,
    		quitCh:     make(chan struct{}),
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 7.1K bytes
    - Viewed (0)
  3. internal/event/target/webhook.go

    		case "http":
    			target.addr += ":80"
    		case "https":
    			target.addr += ":443"
    		default:
    			return nil, errors.New("unsupported scheme")
    		}
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.cancelCh, target.loggerOnce)
    	}
    
    	return target, nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 8.8K bytes
    - Viewed (0)
  4. internal/event/target/mqtt.go

    		id:         event.TargetID{ID: id, Name: "mqtt"},
    		args:       args,
    		store:      queueStore,
    		quitCh:     make(chan struct{}),
    		loggerOnce: loggerOnce,
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 8.2K bytes
    - Viewed (0)
  5. internal/event/target/redis.go

    		args:       args,
    		pool:       pool,
    		store:      queueStore,
    		loggerOnce: loggerOnce,
    		quitCh:     make(chan struct{}),
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 9.1K bytes
    - Viewed (0)
  6. internal/event/target/kafka.go

    				Limit:         args.BatchSize,
    				Log:           loggerOnce,
    				Store:         queueStore,
    				CommitTimeout: args.BatchCommitTimeout,
    			})
    		}
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    }
    
    func isKafkaConnErr(err error) bool {
    	// Sarama opens the circuit breaker after 3 consecutive connection failures.
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 13.6K bytes
    - Viewed (0)
  7. internal/event/target/amqp.go

    		id:         event.TargetID{ID: id, Name: "amqp"},
    		args:       args,
    		loggerOnce: loggerOnce,
    		store:      queueStore,
    		quitCh:     make(chan struct{}),
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 10K bytes
    - Viewed (0)
  8. internal/event/target/postgresql.go

    		args:       args,
    		firstPing:  false,
    		store:      queueStore,
    		connString: connStr,
    		loggerOnce: loggerOnce,
    		quitCh:     make(chan struct{}),
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    }
    
    var errInvalidPsqlTablename = errors.New("invalid PostgreSQL table")
    
    func validatePsqlTableName(name string) error {
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 13.3K bytes
    - Viewed (0)
  9. internal/logger/target/kafka/kafka.go

    		return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
    	}
    	ctx, cancel := context.WithCancel(ctx)
    	h.store = queueStore
    	h.storeCtxCancel = cancel
    	store.StreamItems(h.store, h, ctx.Done(), h.kconfig.LogOnce)
    	return
    }
    
    func (h *Target) startKafkaLogger() {
    	h.logChMu.RLock()
    	logCh := h.logCh
    	if logCh != nil {
    		// We are not allowed to add when logCh is nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 10.2K bytes
    - Viewed (0)
  10. internal/event/target/mysql.go

    		args:       args,
    		firstPing:  false,
    		store:      queueStore,
    		loggerOnce: loggerOnce,
    		quitCh:     make(chan struct{}),
    	}
    
    	if target.store != nil {
    		store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
    	}
    
    	return target, nil
    Registered: Sun Nov 03 19:28:11 UTC 2024
    - Last Modified: Fri Sep 06 23:06:30 UTC 2024
    - 11.6K bytes
    - Viewed (0)
Back to top