我目前正在尝试使用 Hadoop 流式处理。我有一个名为 diamonds.txt 的文件,其中包含钻石的克拉数及其旁边的价格,所有内容均以逗号 (csv) 分隔。
文件前五行的例子:
carat,price
0.23,326
0.21,326
0.23,327
0.29,334
我还有 2 个 python 文件,以便在 diamonds.txt 文件上使用 mapreduce。
mapper.py:
#!/usr/bin/env python
import sys
for line in sys.stdin:
# Split the input line into a list of values
values = line.strip().split(',')
# Extract the carat and price values
carat = float(values[0])
price = float(values[1])
# Write the key-value pair to stdout
print(f"{carat}\t{price}")
reducer.py:
#!/usr/bin/env python
import sys
# Initialize variables
sum_x = 0
sum_y = 0
sum_xy = 0
sum_x_squared = 0
sum_y_squared = 0
n = 0
for line in sys.stdin:
# Split the input line into a list of values
values = line.strip().split('\t')
# Extract the carat and price values
carat = float(values[0])
price = float(values[1])
# Update the sums and counts
sum_x += carat
sum_y += price
sum_xy += carat * price
sum_x_squared += carat ** 2
sum_y_squared += price ** 2
n += 1
# Calculate the correlation coefficient
numerator = (n * sum_xy) - (sum_x * sum_y)
denominator = ((n * sum_x_squared) - (sum_x ** 2)) * ((n * sum_y_squared) - (sum_y ** 2))
correlation_coefficient = numerator / denominator ** 0.5
# Write the correlation coefficient to stdout
print(correlation_coefficient)
当我使用提示时: hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.4.jar -file ./mapper.py -mapper ./mapper.py -file ./reducer.py -reducer ./reducer. py -input hdfs://localhost:9000/CA/diamonds.txt -output hdfs://localhost:9000/output20
我收到此错误消息:
2023-04-17 13:28:22,062 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./mapper.py, ./reducer.py] [] /tmp/streamjob75540364628648902.jar tmpDir=null
2023-04-17 13:28:22,639 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-04-17 13:28:22,695 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-04-17 13:28:22,695 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-04-17 13:28:22,705 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-04-17 13:28:22,857 INFO mapred.FileInputFormat: Total input files to process : 1
2023-04-17 13:28:22,883 INFO mapreduce.JobSubmitter: number of splits:1
2023-04-17 13:28:22,984 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local401521236_0001
2023-04-17 13:28:22,984 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-04-17 13:28:23,096 INFO mapred.LocalDistributedCacheManager: Localized file:/home/hduser/Desktop/CA/mapper.py as file:/tmp/hadoop-hduser/mapred/local/job_local401521236_0001_edf78da7-1b82-4a4d-8c4d-218d620e563f/mapper.py
2023-04-17 13:28:23,098 INFO mapred.LocalDistributedCacheManager: Localized file:/home/hduser/Desktop/CA/reducer.py as file:/tmp/hadoop-hduser/mapred/local/job_local401521236_0001_9199da8c-1ef8-49bb-9926-1af65410256b/reducer.py
2023-04-17 13:28:23,132 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2023-04-17 13:28:23,133 INFO mapreduce.Job: Running job: job_local401521236_0001
2023-04-17 13:28:23,135 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2023-04-17 13:28:23,136 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
2023-04-17 13:28:23,138 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2023-04-17 13:28:23,138 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2023-04-17 13:28:23,168 INFO mapred.LocalJobRunner: Waiting for map tasks
2023-04-17 13:28:23,177 INFO mapred.LocalJobRunner: Starting task: attempt_local401521236_0001_m_000000_0
2023-04-17 13:28:23,191 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2023-04-17 13:28:23,191 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2023-04-17 13:28:23,201 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
2023-04-17 13:28:23,205 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/CA/diamonds.txt:0+567047
2023-04-17 13:28:23,218 INFO mapred.MapTask: numReduceTasks: 1
2023-04-17 13:28:23,302 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2023-04-17 13:28:23,302 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
2023-04-17 13:28:23,302 INFO mapred.MapTask: soft limit at 83886080
2023-04-17 13:28:23,302 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
2023-04-17 13:28:23,302 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
2023-04-17 13:28:23,305 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2023-04-17 13:28:23,310 INFO streaming.PipeMapRed: PipeMapRed exec [/home/hduser/Desktop/CA/./mapper.py]
2023-04-17 13:28:23,312 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
2023-04-17 13:28:23,313 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
2023-04-17 13:28:23,313 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
2023-04-17 13:28:23,313 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
2023-04-17 13:28:23,314 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
2023-04-17 13:28:23,314 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2023-04-17 13:28:23,314 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2023-04-17 13:28:23,314 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2023-04-17 13:28:23,314 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
2023-04-17 13:28:23,314 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
2023-04-17 13:28:23,315 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
2023-04-17 13:28:23,315 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
/usr/bin/env: ‘python\r’: No such file or directory
2023-04-17 13:28:23,338 INFO streaming.PipeMapRed: MRErrorThread done
2023-04-17 13:28:23,531 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2023-04-17 13:28:23,531 INFO streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2023-04-17 13:28:23,532 INFO streaming.PipeMapRed: R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s]
2023-04-17 13:28:23,536 INFO streaming.PipeMapRed: R/W/S=1000/0/0 in:NA [rec/s] out:NA [rec/s]
2023-04-17 13:28:23,544 INFO streaming.PipeMapRed: R/W/S=10000/0/0 in:NA [rec/s] out:NA [rec/s]
2023-04-17 13:28:23,547 INFO streaming.PipeMapRed: R/W/S=13841/0/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 HOST=null
USER=hduser
HADOOP_USER=null
last tool output: |null|
java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:433)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2023-04-17 13:28:23,548 WARN streaming.PipeMapRed: {}
java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:433)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:532)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:120)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2023-04-17 13:28:23,548 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:120)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2023-04-17 13:28:23,549 WARN streaming.PipeMapRed: {}
java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:433)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:532)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2023-04-17 13:28:23,549 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2023-04-17 13:28:23,549 INFO mapred.LocalJobRunner: map task executor complete.
2023-04-17 13:28:23,727 WARN mapred.LocalJobRunner: job_local401521236_0001
java.lang.Exception: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2023-04-17 13:28:24,137 INFO mapreduce.Job: Job job_local401521236_0001 running in uber mode : false
2023-04-17 13:28:24,138 INFO mapreduce.Job: map 0% reduce 0%
2023-04-17 13:28:24,139 INFO mapreduce.Job: Job job_local401521236_0001 failed with state FAILED due to: NA
2023-04-17 13:28:24,143 INFO mapreduce.Job: Counters: 0
2023-04-17 13:28:24,143 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
我怎样才能让它发挥作用? 当这三个文件(diamonds.txt、mapper.py 和 reducer.py)位于同一文件夹中时,我可以获得所需的结果。 谢谢。