从 Cypher 查询的输出中排除 `null` 或空 [] 值的最佳方法

问题描述 投票:0回答:1

背景 我已经设置了一个 neo4j 向量索引与 cypher 查询相结合来获取特定节点并遍历它们的路径以返回特定数据。鉴于这些数据将传递给法学硕士进行总结,输出应该尽可能干净。

功能

def generate_employee_cypher_query(schema)
:通过遵循架构/本体路径从实体 Èmployee` 获取特定路径:

def generate_employee_cypher_query(schema):
    employee_schema = schema["Employee"]
    match_clauses = []
    return_blocks = []
    
    # Initial vector search
    base_query = """
    CALL db.index.vector.queryNodes($index_name, $n_results, $query_embedding)
    YIELD node AS employee, score
    WHERE employee:Employee AND employee.user_id = $user_id
    
    MATCH (employee)
    """
    
    # Generate MATCH clauses from schema relationships
    for rel in employee_schema["relationships"]:
        rel_type = rel["type"]
        end_node = rel["endNode"]
        cardinality = rel.get("cardinality", "0..n")  # Default to 0..n if not specified
        
        # Handle different cardinalities and relationship directions
        if rel["startNode"] == "Employee":
            # Outgoing relationship from Employee
            match_clauses.append(
                f"OPTIONAL MATCH (employee)-[{rel_type.lower()}Rel:{rel_type}]->"
                f"({rel_type.lower()}:{end_node})"
            )
            
            # Special handling for relationships with TimeFrame
            if end_node in ["Availability", "Unavailability"]:
                match_clauses.append(
                    f"OPTIONAL MATCH ({rel_type.lower()})-[:HAS_TIMEFRAME]->"
                    f"({rel_type.lower()}TimeFrame:TimeFrame)"
                )
        else:
            # Incoming relationship to Employee
            match_clauses.append(
                f"OPTIONAL MATCH ({rel_type.lower()}:{end_node})"
                f"-[{rel_type.lower()}Rel:{rel_type}]->(employee)"
            )
    
    # Generate return blocks for each relationship
    return_blocks.append("""
    employee.name AS employeeName,
    score,
    apoc.convert.toJson(
        CASE WHEN employee IS NOT NULL 
        THEN apoc.map.removeKeys(properties(employee), 
            ['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
        )
        ELSE {} 
        END
    ) AS employeeJson,
    """)
    
    # Start connections array
    return_blocks.append("apoc.convert.toJson([")
    
    # Generate individual connection blocks
    connection_blocks = []
    for rel in employee_schema["relationships"]:
        rel_type = rel["type"]
        end_node = rel["endNode"]
        cardinality = rel.get("cardinality", "0..n")
        
        # Handle cardinality in return statement
        is_single = cardinality in ["1..1", "0..1"]
        collection_suffix = "[0]" if is_single else ""
        
        if end_node in ["Availability", "Unavailability"]:
            # Special handling for timeframe relationships
            connection_blocks.append(f"""{{
                type: '{rel_type}',
                {rel_type.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL 
                    THEN {{
                        employeeName: {rel_type.lower()}.employeeName,
                        timeframe: CASE WHEN {rel_type.lower()}TimeFrame IS NOT NULL 
                            THEN {{
                                dateIndicator: {rel_type.lower()}TimeFrame.dateIndicator,
                                type: {rel_type.lower()}TimeFrame.type,
                                recurring: {rel_type.lower()}TimeFrame.recurring
                            }}
                            ELSE null 
                        END
                    }}
                    ELSE null END){collection_suffix}
            }}""")
        else:
            # Standard relationship handling
            connection_blocks.append(f"""{{
                type: '{rel_type}',
                {end_node.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL 
                    THEN apoc.map.removeKeys(properties({rel_type.lower()}), 
                        ['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
                    )
                    ELSE null END){collection_suffix}
            }}""")
    
    # Close connections array
    return_blocks.append(",\n".join(connection_blocks))
    return_blocks.append("]) AS connectionsJson")
    
    # Combine all query parts
    full_query = (
        base_query +
        "\n".join(match_clauses) +
        "\nRETURN " +
        "\n".join(return_blocks)
    )
    
    return full_query

现在,这个特定函数的输出可以在下面的示例中看到:

Employee Vector Search Results:
[
  {
    "employeeName": "Emma Williams",
    "score": 0.6321649551391602,
    "employee": {
      "name": "Emma Williams",
      "email": "[email protected]"
    },
    "connections": [
      {
        "contract": {
          "contractType": "Part-time"
        },
        "type": "HAS_CONTRACT_TYPE"
      },
      {
        "has_unavailability": [],
        "type": "HAS_UNAVAILABILITY"
      },
      {
        "has_availability": [
          {
            "employeeName": "Emma Williams",
            "timeframe": {
              "recurring": true,
              "dateIndicator": "Thu-Sat 16:00-00:00",
              "type": "DayOfWeek"
            }
          }
        ],
        "type": "HAS_AVAILABILITY"
      },
      {
        "team": {
          "name": "F&B"
        },
        "type": "BELONGS_TO"
      },
      {
        "education": [],
        "type": "HAS_EDUCATION"
      },
      {
        "type": "HAS_CERTIFICATION",
        "certification": [
          {
            "name": "Alcohol Service"
          },
          {
            "name": "Mixology Certificate"
          }
        ]
      },

注意上面输出中

"education": [],
"has_unavailability": [],
中的空字符串。我希望这些值不包含在输出中。

提前谢谢你 曼努埃尔

我尝试使用 CASE 表达式或 apoc.map.clean 但似乎没有得到我想要的结果。

neo4j cypher neo4j-python-driver
1个回答
0
投票

只修改 JSON 可能会更简单

Employee Vector Search Results

例如,下面是一个示例函数,它从每个

connections
列表中过滤掉任何具有
type
键和另一个值为空列表的键的对象:

def filter_employee_connections(json_result):
    result = json.loads(json_result)
    for employee in result:
        employee["connections"] = [
            c for c in employee["connections"]
            if not any(isinstance(val, list) and not val for key, val in c.items() if key != "type")
        ]
    return json.dumps(result, indent=2)

生成的 JSON 看起来像这样:

[
  {
    "employeeName": "Emma Williams",
    "score": 0.6321649551391602,
    "employee": {
      "name": "Emma Williams",
      "email": "[email protected]"
    },
    "connections": [
      {
        "contract": {
          "contractType": "Part-time"
        },
        "type": "HAS_CONTRACT_TYPE"
      },
      {
        "has_availability": [
          {
            "employeeName": "Emma Williams",
            "timeframe": {
              "recurring": true,
              "dateIndicator": "Thu-Sat 16:00-00:00",
              "type": "DayOfWeek"
            }
          }
        ],
        "type": "HAS_AVAILABILITY"
      },
      {
        "team": {
          "name": "F&B"
        },
        "type": "BELONGS_TO"
      },
      {
        "type": "HAS_CERTIFICATION",
        "certification": [
          {
            "name": "Alcohol Service"
          },
          {
            "name": "Mixology Certificate"
          }
        ]
      }
    ]
  }
]
© www.soinside.com 2019 - 2024. All rights reserved.