我正在使用 Logstash 节流插件来限制事件的发生率。如果我手动设置
after_count
,一切都可以。不过我正在尝试从 Redis 获取这个值。然后将其设置为新字段,然后在throttle插件中使用它。所以我的配置文件是这样的:
filter {
ruby {
init => 'require "redis"; $redis = Redis.new(host: "127.0.0.1", port: 6379, db: 5)'
code => 'event.set("limit", $redis.get("limit"))'
}
mutate {
convert => {
"limit" => "integer"
}
}
throttle {
after_count => %{limit}
period => 1
max_age => 2
key => "[message]"
add_tag => "throttled"
}
if "throttled" in [tags] {
drop { }
}
}
虽然检查日志时没有错误,但运行此配置时索引事件会停止。有人可以帮助我吗?有没有办法根据redis中的值来限制速率?
不可以,您不能在节流过滤器的 before_count 或 after_count 选项中使用 sprintf 引用。 代码直接使用选项值,而不是sprintf它们。
“%{limit}”将转换为零,因此“count > 0”始终评估为 true,并且每个事件都会被标记并删除。
将此Java插件添加到logstash-core中,然后打包到logstash-core.jar中
@LogstashPlugin(name = "java_rate_limit")
public class RateLimitFilter implements Filter {
private static final Logger log = LogManager.getLogger(RateLimitFilter.class);
public static final PluginConfigSpec<String> RATE_PATH = PluginConfigSpec.stringSetting("rate_path");
public static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1);
private String id;
private String ratePath;
private RateLimiter rateLimiter;
private double lastRate;
/**
* Required constructor.
*
* @param id Plugin id
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public RateLimitFilter(final String id, final Configuration configuration, final Context context) {
this.id = id;
this.ratePath = configuration.get(RATE_PATH);
this.rateLimiterEnabled = ratePath != null && !ratePath.trim().isEmpty();
if (rateLimiterEnabled) {
SCHEDULER.scheduleWithFixedDelay(() -> updateRateLimiterIfRateChanged(), 0, 1, TimeUnit.SECONDS);
}
private void updateRateLimiterIfRateChanged() {
double rate = -1D;
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(ratePath))) {
String firstLine = bufferedReader.readLine();
if (firstLine != null) {
rate = Double.valueOf(firstLine);
}
} catch (Throwable e) {
log.error("Get rate value failed!", e);
}
if (rate != lastRate) {
if (rate <= 0D) {
rateLimiter = null;
log.warn("# Rate is not positive, set RateLimiter to null! lastRate:[{}] rate:[{}] ratePath:[{}].", lastRate, rate, ratePath);
} else if (rate > 0D) {
rateLimiter = RateLimiter.create(rate);
log.warn("# Rate changed, set new RateLimiter! lastRate:[{}] rate:[{}] ratePath:[{}].", lastRate, rate, ratePath);
}
lastRate = rate;
}
}
@Override
public Collection<Event> filter(Collection<Event> events, final FilterMatchListener filterMatchListener) {
if (events == null || events.isEmpty()) {
return events;
}
if (rateLimiter != null) {
rateLimiter.acquire(events.size());
}
for (Event event : events) {
filterMatchListener.filterMatched(event);
}
return events;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return PluginHelper.commonFilterSettings(Arrays.asList(RATE_PATH));
}
@Override
public String getId() {
return id;
}
}
然后在conf中添加过滤器
filter {
# plugin name
java_rate_limit {
# rate in a text file
rate_path => "/path/rate.txt"
}
}
您可以在文件中更改速率,1秒内生效。