我已经在代码条识别项目上工作了几周。我被要求使用 GIGE 相机来识别 PCB 上的条码,我选择使用 python 来完成这项工作。 至此,我已经完成了Opencv对图片中codebar的识别。问题是如何连接到 GIGE 相机并使用“我的程序”抓取照片。 不幸的是,我发现 Opencv 不支持 GIGE 相机,所以我不得不选择 Halcon。然而,尽管我可以使用 HDevelop 连接并捕获图像,但我找不到将其链接到我的 Python 程序的解决方案,因为 Halcon 程序只能导出为 C# 或 C++
顺便说一句,我尝试使用 pythonnet 和 ironPython,但我不知道如何使用它们来执行 C# 脚本(.cs 文件)
我在使用 Python 连接 GigE 相机时也遇到了同样的困难。值得庆幸的是,我找到了这个名为 Harvesters 的库。使用采集器和 OpenCV,您可以从 GigE 相机捕获图像并对其进行处理。您可以查看他们的文档这里。
要使用 Harvesters 连接到相机,您首先需要安装库:
pip install harvesters
在此之后,您将需要一个 GenTL 制作者来抓取图像。您将使用您下载的任何生产商 .CTI 文件。我个人使用 Matrix Vision mvAquire,因为它是免费的并且没有供应商锁定。在此处查看更多信息CTI 文件信息
完成这些先决条件后,您可以通过执行以下操作连接到相机:
from harvesters.core import Harvester
h = Harvester()
h.add_file('path/to/foo.cti')
h.update()
ia = h.create(0) # Connect to first camera in device_info_list
ia.start()
with ia.fetch_buffer() as buffer:
component = buffer.payload.components[0]
_2d = component.data.reshape(component.height,component.width, int(component.num_components_per_pixel))
# Do any processing on the image data here...
ia.stop()
ia.destory()
h.reset()
我为此苦苦挣扎,但我偶然发现了这个方法。我有一台 IDS 工业视觉相机 (IDS GV-5860-CP),它具有受支持的 Python 库。 IDS Peak IPL SDK 具有将图像转换为 NumPy 3D 数组的扩展。
我的代码与相机建立连接并访问相机的数据流。该数据流用转换为图像的数据填充缓冲区。此转换需要已知的 RGB 格式。该数据以 RGB 格式写入,并以数组形式排列。这些数组可以转换为 NumPy 3D 数组。该数组可供 OpenCV 访问,并且可以显示为图像。
大多数 Gige Vision 相机都使用缓冲区。请小心,因为缓冲区可能会导致延迟。如果将获取的缓冲区转换为图像(未写入,写入图像需要大量处理能力),则只需在 NumPy 3D 数组中更改转换后的图像即可获取可在 OpenCV 窗口中显示的图像。
这是我使用IDS工业相机的代码,希望对您自己的项目有所帮助。
我的代码:
import numpy as np
import cv2
import sys
from ids_peak import ids_peak as peak
from ids_peak_ipl import ids_peak_ipl as ipl
from ids_peak import ids_peak_ipl_extension
m_device = None
m_dataStream = None
m_node_map_remote_device = None
out = None
def open_camera():
print("connection- camera")
global m_device, m_node_map_remote_device
try:
# Create instance of the device manager
device_manager = peak.DeviceManager.Instance()
# Update the device manager
device_manager.Update()
# Return if no device was found
if device_manager.Devices().empty():
return False
# open the first openable device in the device manager's device list
device_count = device_manager.Devices().size()
for i in range(device_count):
if device_manager.Devices()[i].IsOpenable():
m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control)
# Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
min_frame_rate = 0
max_frame_rate = 50
inc_frame_rate = 0
# Get frame rate range. All values in fps.
min_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Minimum()
max_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Maximum()
if m_node_map_remote_device.FindNode("AcquisitionFrameRate").HasConstantIncrement():
inc_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Increment()
else:
# If there is no increment, it might be useful to choose a suitable increment for a GUI control element (e.g. a slider)
inc_frame_rate = 0.1
# Get the current frame rate
frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Value()
# Set frame rate to maximum
m_node_map_remote_device.FindNode("AcquisitionFrameRate").SetValue(max_frame_rate)
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by connection camera")
return False
def prepare_acquisition():
print("opening stream")
global m_dataStream
try:
data_streams = m_device.DataStreams()
if data_streams.empty():
print("no stream possible")
# no data streams available
return False
m_dataStream = m_device.DataStreams()[0].OpenDataStream()
print("open stream")
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by prep acquisition")
return False
def set_roi(x, y, width, height):
print("setting ROI")
try:
# Get the minimum ROI and set it. After that there are no size restrictions anymore
x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
w_min = m_node_map_remote_device.FindNode("Width").Minimum()
h_min = m_node_map_remote_device.FindNode("Height").Minimum()
m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
m_node_map_remote_device.FindNode("Width").SetValue(w_min)
m_node_map_remote_device.FindNode("Height").SetValue(h_min)
# Get the maximum ROI values
x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
w_max = m_node_map_remote_device.FindNode("Width").Maximum()
h_max = m_node_map_remote_device.FindNode("Height").Maximum()
if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max):
print("Error x and y values")
return False
elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max):
print("Error width and height")
return False
else:
# Now, set final AOI
m_node_map_remote_device.FindNode("OffsetX").SetValue(x)
m_node_map_remote_device.FindNode("OffsetY").SetValue(y)
m_node_map_remote_device.FindNode("Width").SetValue(width)
m_node_map_remote_device.FindNode("Height").SetValue(height)
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by setting ROI")
print(str_error)
return False
def alloc_and_announce_buffers():
print("allocating buffers")
try:
if m_dataStream:
# Flush queue and prepare all buffers for revoking
m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
# Clear all old buffers
for buffer in m_dataStream.AnnouncedBuffers():
m_dataStream.RevokeBuffer(buffer)
payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
# Get number of minimum required buffers
num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
# Alloc buffers
for count in range(num_buffers_min_required):
buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
m_dataStream.QueueBuffer(buffer)
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by allocating buffers")
print(str_error)
return False
def start_acquisition():
print("Start acquisition")
try:
m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
return True
except Exception as e:
# ...
str_error = str(e)
print(str_error)
return False
def saving_acquisition():
fourcc = cv2.VideoWriter_fourcc('W','M','V','2')
out = cv2.VideoWriter( "video", fourcc, 50, (1936, 1096))
while True:
try:
# Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
buffer = m_dataStream.WaitForFinishedBuffer(5000)
image = ids_peak_ipl_extension.BufferToImage(buffer)
# Create IDS peak IPL image for debayering and convert it to RGBa8 format
image_processed = image.ConvertTo(ipl.PixelFormatName_BGR8)
# Queue buffer again
m_dataStream.QueueBuffer(buffer)
image_python = image_processed.get_numpy_3D()
frame = image_python
out.write(frame)
cv2.imshow('videoview',frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
except Exception as e:
# ...
str_error = str(e)
print("Error by saving acquisition")
print(str_error)
return False
def main():
# initialize library
peak.Library.Initialize()
if not open_camera():
# error
sys.exit(-1)
if not prepare_acquisition():
# error
sys.exit(-2)
if not alloc_and_announce_buffers():
# error
sys.exit(-3)
if not start_acquisition():
# error
sys.exit(-4)
if not saving_acquisition():
out.release()
cv2.destroyAllWindows()
print("oke")
# error
peak.Library.Close()
print('executed')
sys.exit(0)
if __name__ == '__main__':
main()
我做了与上一张海报类似的事情,但注意到由于某种原因返回框架不起作用。
def saving_acquisition():
...
image_python = image_processed.get_numpy_3D()
return image_python
img = saving_acquisition()
# This results in an unknown C error.
cv2.imshow('image',img )
如果您创建 ndarray 的副本并返回它,它会出于某种原因起作用。
def saving_acquisition():
...
image_python = image_processed.get_numpy_3D()
return image_python.copy()
img = saving_acquisition()
# This works.
cv2.imshow('image',img )
任何愿意启发我解释为什么会出现这种情况的人将不胜感激。