我正在读取文本文件SMSSpamCollection作为水槽源,并将其发布到卡夫卡主题,该主题是水槽。
# Agent Name:
a1.sources = r1
a1.sinks = sample
a1.channels = sample-channel
# Source configuration:
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /Users/val/Documents/code/spark/m11_to_Upload/SMSSpamCollection
a1.sources.r1.logStdErr = true
# Sink type
#a1.sinks.sample.type = logger
# Buffers events in memory to channel
a1.channels.sample-channel.type = memory
a1.channels.sample-channel.capacity = 1000
a1.channels.sample-channel.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels.selector.type = replicating
a1.sources.r1.channels = sample-channel
# Related settings Kafka, topic, and host channel where it set the source
a1.sinks.sample.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sample.topic = sample_topic
a1.sinks.sample.brokerList = 127.0.0.1:9092
a1.sinks.sample.requiredAcks = 1
a1.sinks.sample.batchSize = 20
a1.sinks.sample.channel = sample-channel
我使用此命令
flume-ng agent --conf conf --conf-file /usr/local/Cellar/flume/1.9.0/libexec/conf/flume-sample.conf -Dflume.root.logger=DEBUG,console --name a1 -Xmx512m -Xms256m
当我从kafka主题读取数据时
kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092
我仅看到原始文件中的最后10条记录。
ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
ham Ard 6 like dat lor.
ham Why don't you wait 'til at least wednesday to see if you get your .
ham Huh y lei...
spam REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
spam This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
ham Will ü b going to esplanade fr home?
ham Pity, * was in mood for that. So...any other suggestions?
ham The guy did some bitching but I acted like i'd be interested in buying something else next week and he gave it to us for free
ham Rofl. Its true to its name
查看所有记录的正确方法是什么?
您正在使用tail
,默认情况下显示文件的最后10行。
代替使用:
a1.sources.r1.command = tail -c +0 -f /Users/val/Documents/code/spark/m11_to_Upload/SMSSpamCollection
-c +0
告诉tail
从文件的第一个字符开始。
顺便说一句,替代方法是将Kafka Connect与Spooldir或File Pulse插件类似使用。