如何使用Python从GIGE相机读取和捕获图像?

问题描述 投票:0回答:3

我已经在代码条识别项目上工作了几周。我被要求使用 GIGE 相机来识别 PCB 上的条码,我选择使用 python 来完成这项工作。 至此,我已经完成了Opencv对图片中codebar的识别。问题是如何连接到 GIGE 相机并使用“我的程序”抓取照片。 不幸的是,我发现 Opencv 不支持 GIGE 相机,所以我不得不选择 Halcon。然而,尽管我可以使用 HDevelop 连接并捕获图像,但我找不到将其链接到我的 Python 程序的解决方案,因为 Halcon 程序只能导出为 C# 或 C++

顺便说一句,我尝试使用 pythonnet 和 ironPython,但我不知道如何使用它们来执行 C# 脚本(.cs 文件)

python opencv halcon gige-sdk
3个回答
1
投票

我在使用 Python 连接 GigE 相机时也遇到了同样的困难。值得庆幸的是,我找到了这个名为 Harvesters 的库。使用采集器和 OpenCV,您可以从 GigE 相机捕获图像并对其进行处理。您可以查看他们的文档这里

要使用 Harvesters 连接到相机,您首先需要安装库:

pip install harvesters

在此之后,您将需要一个 GenTL 制作者来抓取图像。您将使用您下载的任何生产商 .CTI 文件。我个人使用 Matrix Vision mvAquire,因为它是免费的并且没有供应商锁定。在此处查看更多信息CTI 文件信息

完成这些先决条件后,您可以通过执行以下操作连接到相机:

from harvesters.core import Harvester
h = Harvester()
h.add_file('path/to/foo.cti')
h.update()
ia = h.create(0) # Connect to first camera in device_info_list

ia.start()
with ia.fetch_buffer() as buffer:
            component = buffer.payload.components[0]
            _2d = component.data.reshape(component.height,component.width, int(component.num_components_per_pixel))
            # Do any processing on the image data here...

ia.stop()
ia.destory()
h.reset()

0
投票

我为此苦苦挣扎,但我偶然发现了这个方法。我有一台 IDS 工业视觉相机 (IDS GV-5860-CP),它具有受支持的 Python 库。 IDS Peak IPL SDK 具有将图像转换为 NumPy 3D 数组的扩展。

我的代码与相机建立连接并访问相机的数据流。该数据流用转换为图像的数据填充缓冲区。此转换需要已知的 RGB 格式。该数据以 RGB 格式写入,并以数组形式排列。这些数组可以转换为 NumPy 3D 数组。该数组可供 OpenCV 访问,并且可以显示为图像。

大多数 Gige Vision 相机都使用缓冲区。请小心,因为缓冲区可能会导致延迟。如果将获取的缓冲区转换为图像(未写入,写入图像需要大量处理能力),则只需在 NumPy 3D 数组中更改转换后的图像即可获取可在 OpenCV 窗口中显示的图像。

这是我使用IDS工业相机的代码,希望对您自己的项目有所帮助。

我的代码:

import numpy as np 
import cv2
import sys

from ids_peak import ids_peak as peak
from ids_peak_ipl import ids_peak_ipl as ipl
from ids_peak import ids_peak_ipl_extension






m_device = None
m_dataStream = None
m_node_map_remote_device = None
out = None


def open_camera():
  print("connection- camera")
  global m_device, m_node_map_remote_device
  try:
      # Create instance of the device manager
    device_manager = peak.DeviceManager.Instance()
 
      # Update the device manager
    device_manager.Update()
 
      # Return if no device was found
    if device_manager.Devices().empty():
      return False
 
      # open the first openable device in the device manager's device list
    device_count = device_manager.Devices().size()
    for i in range(device_count):
        if device_manager.Devices()[i].IsOpenable():
            m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control)
 
              # Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
            m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
            min_frame_rate = 0
            max_frame_rate = 50
            inc_frame_rate = 0

            
            # Get frame rate range. All values in fps.
            min_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Minimum()
            max_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Maximum()
            
            if m_node_map_remote_device.FindNode("AcquisitionFrameRate").HasConstantIncrement():
                inc_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Increment()
            else:
                # If there is no increment, it might be useful to choose a suitable increment for a GUI control element (e.g. a slider)
                inc_frame_rate = 0.1
            
            # Get the current frame rate
            frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Value()
            
            # Set frame rate to maximum
            m_node_map_remote_device.FindNode("AcquisitionFrameRate").SetValue(max_frame_rate)
        return True
  except Exception as e:
      # ...
    str_error = str(e)
    print("Error by connection camera")
    return False
 
 
def prepare_acquisition():
  print("opening stream")
  global m_dataStream

  try:
    data_streams = m_device.DataStreams()
    if data_streams.empty():
      print("no stream possible")
      # no data streams available
      return False
 
    m_dataStream = m_device.DataStreams()[0].OpenDataStream()
    print("open stream")
 
    return True
  except Exception as e:
      # ...
      str_error = str(e)
      print("Error by prep acquisition")
      return False
 
 
def set_roi(x, y, width, height):
  print("setting ROI")
  try:
      # Get the minimum ROI and set it. After that there are no size restrictions anymore
    x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
    y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
    w_min = m_node_map_remote_device.FindNode("Width").Minimum()
    h_min = m_node_map_remote_device.FindNode("Height").Minimum()
 
    m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
    m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
    m_node_map_remote_device.FindNode("Width").SetValue(w_min)
    m_node_map_remote_device.FindNode("Height").SetValue(h_min)
 
      # Get the maximum ROI values
    x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
    y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
    w_max = m_node_map_remote_device.FindNode("Width").Maximum()
    h_max = m_node_map_remote_device.FindNode("Height").Maximum()
 
    if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max):
      print("Error x and y values")
      return False
    elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max):
      print("Error width and height")
      return False
    else:
          # Now, set final AOI
        m_node_map_remote_device.FindNode("OffsetX").SetValue(x)
        m_node_map_remote_device.FindNode("OffsetY").SetValue(y)
        m_node_map_remote_device.FindNode("Width").SetValue(width)
        m_node_map_remote_device.FindNode("Height").SetValue(height)
 
        return True
  except Exception as e:
      # ...
       str_error = str(e)
       print("Error by setting ROI")
       print(str_error)
       return False
 
 
def alloc_and_announce_buffers():
  print("allocating buffers")
  try:
    if m_dataStream:
          # Flush queue and prepare all buffers for revoking
        m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
 
          # Clear all old buffers
        for buffer in m_dataStream.AnnouncedBuffers():
            m_dataStream.RevokeBuffer(buffer)
 
        payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
 
          # Get number of minimum required buffers
        num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
 
          # Alloc buffers
        for count in range(num_buffers_min_required):
            buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
            m_dataStream.QueueBuffer(buffer)
 
        return True
  except Exception as e:
      # ...
    str_error = str(e)
    print("Error by allocating buffers")
    print(str_error)
    return False
 
 
def start_acquisition():
  print("Start acquisition")

  try:
    m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
    m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
    m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
       
    return True
  except Exception as e:
      # ...
      str_error = str(e)
      print(str_error)
      return False

def saving_acquisition():  
  fourcc = cv2.VideoWriter_fourcc('W','M','V','2')
  out = cv2.VideoWriter( "video", fourcc, 50, (1936,  1096))
  while True:
    try:
      
      # Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
      buffer = m_dataStream.WaitForFinishedBuffer(5000)

      image = ids_peak_ipl_extension.BufferToImage(buffer)
        
      # Create IDS peak IPL image for debayering and convert it to RGBa8 format
            
      image_processed = image.ConvertTo(ipl.PixelFormatName_BGR8)
      # Queue buffer again
      m_dataStream.QueueBuffer(buffer)
        
      image_python = image_processed.get_numpy_3D()

      frame = image_python
    
      out.write(frame)
      cv2.imshow('videoview',frame)
      
      key = cv2.waitKey(1)
      if key == ord('q'):
        break

      
    except Exception as e:
      # ...
      str_error = str(e)
      print("Error by saving acquisition")
      print(str_error)
      return False

 
def main():
  
  # initialize library
  peak.Library.Initialize()
 
  if not open_camera():
    # error
    sys.exit(-1)
 
  if not prepare_acquisition():
    # error
    sys.exit(-2)
 
  if not alloc_and_announce_buffers():
    # error
    sys.exit(-3)
 
  if not start_acquisition():
    # error
    sys.exit(-4)

  if not saving_acquisition():
    out.release()
    cv2.destroyAllWindows()
    print("oke")
    # error
 
  peak.Library.Close()
  print('executed')
  sys.exit(0)
 
if __name__ == '__main__':
  main()

0
投票

我做了与上一张海报类似的事情,但注意到由于某种原因返回框架不起作用。

def saving_acquisition():  
  ...
  image_python = image_processed.get_numpy_3D()
  return image_python

img = saving_acquisition()

# This results in an unknown C error.
cv2.imshow('image',img )

如果您创建 ndarray 的副本并返回它,它会出于某种原因起作用。

def saving_acquisition():  
  ...
  image_python = image_processed.get_numpy_3D()
  return image_python.copy()

img = saving_acquisition()

# This works.
cv2.imshow('image',img )

任何愿意启发我解释为什么会出现这种情况的人将不胜感激。

© www.soinside.com 2019 - 2024. All rights reserved.