我在 Kubernetes 上有一个 Airflow 集群,由 SRE 团队维护。集群使用从
sa_project_a
创建的服务帐户 project_a
来验证和访问 GCP 项目。我需要实现一个 Python 脚本来从 project_a
中的 BigQuery 提取数据,然后将数据写入到 project_b
中 Cloud SQL 上托管的 MySQL 数据库。
sa_project_a
已获得许可
BigQuery Data Viewer
BigQuery Job User
Cloud SQL Client
Secret Manager Secret Accessor
在从BigQuery提取数据的部分,不存在权限问题。
def bigquery_to_df(self, query: str) -> None:
"""Query the data from BigQuery with SQL and convert to Pandas DataFrame
Args:
query (str): SQL query string
"""
client = bigquery.Client()
query_job = client.query(query)
rows = query_job.result()
data = [dict(row) for row in rows]
self.df = pd.DataFrame(data=data)
但是,要连接到 Cloud SQL,我使用的是 cloud-sql-python-connector Python 库。
def create_connect(
self,
connection_name: str,
db_user: str,
db_pass: str,
db_name: str,
database: str,
lib: str,
) -> None:
"""Create the connect cursor to database in Cloud SQL
Args:
connection_name (str): Connection's name of Cloud SQL instance
db_user (str): User id
db_pass (str): Password
db_name (str): Database name on Cloud SQL instance
database (str, optional): Database software that host on Cloud SQL.
lib (str, optional): Database library to connect the database.
"""
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
connector = Connector(ip_type)
def _get_conn() -> pymysql.connections.Connection:
"""Create the pymysql connection to database on Cloud SQL
Returns:
pymysql.connections.Connection: the pymysql connection
"""
conn = connector.connect(
instance_connection_string=connection_name,
driver=lib,
user=db_user,
password=db_pass,
db=db_name,
)
return conn
self.db_name = db_name
self._pool = sqlalchemy.create_engine(
f"{database}+{lib}://",
creator=_get_conn,
).connect()
logging.info(" --> Start the connection!")
当我尝试创建连接时,遇到以下错误:
aiohttp.client_exceptions.ClientResponseError: 403, message="Forbidden: Authenticated IAM principal does not seeem authorized to make API request. Verify 'Cloud SQL Admin API' is enabled within your GCP project and 'Cloud SQL Client' role has been granted to IAM principal.", url='https://sqladmin.googleapis.com/sql/v1beta4/projects/kissflow-prod/instances/kissflowdb/connectSettings'
随后,我启用了 Cloud SQL Admin API 并将 Cloud SQL 客户端角色添加到
sa_project_a
。然后我重新执行管道,但同样的错误仍然存在。
为了快速解决问题,我在
sa_project_b
(与 Cloud SQL 实例相同的项目)内生成了一个新的服务帐户 project_b
,其角色与 sa_project_a
相同。我将凭据存储在 Secret Manager 中并在创建连接之前检索它们。
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{self.project_id}/secrets/{self.secret_id}/versions/latest"
request = client.access_secret_version(request={"name": name})
response = json.loads(request.payload.data.decode("UTF-8"))
credentials = service_account.Credentials.from_service_account_info(response)
connector = Connector(ip_type=ip_type, credentials=credentials)
并且能够毫无问题地创建连接。
我怀疑
sa_project_a
可能没有使用Cloud SQL Admin API的权限,因为我使用了完全相同的脚本,但只更改了服务帐户,该帐户是在与Cloud SQL实例相同的项目中创建的。 sa_project_b
可以自动访问 Cloud SQL Admin API,因为它是在同一项目中创建的。`
def create_connect( 自己, 连接名称:str, 数据库用户:str, db_pass: 字符串, 数据库名称:str, 数据库:str, 库:str, ) -> 无: """在 Cloud SQL 中创建到数据库的连接游标
Args:
connection_name (str): Connection's name of Cloud SQL instance
db_user (str): User id
db_pass (str): Password
db_name (str): Database name on Cloud SQL instance
database (str, optional): Database software that host on Cloud SQL.
lib (str, optional): Database library to connect the database.
"""
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
connector = Connector(ip_type)
def _get_conn() -> pymysql.connections.Connection:
"""Create the pymysql connection to database on Cloud SQL
Returns:
pymysql.connections.Connection: the pymysql connection
"""
conn = connector.connect(
instance_connection_string=connection_name,
driver=lib,
user=db_user,
password=db_pass,
db=db_name,
)
return conn
self.db_name = db_name
self._pool = sqlalchemy.create_engine(
f"{database}+{lib}://",
creator=_get_conn,
).connect()
logging.info(" --> Start the connection!")
当我尝试创建连接时,遇到以下错误:
aiohttp.client_exceptions.ClientResponseError:403,message =“Forbidden:经过身份验证的 IAM 委托人似乎无权发出 API 请求。请验证“Cloud SQL Admin API”已在您的 GCP 项目中启用,并且“Cloud SQL 客户端”角色已被授予IAM 主体。", url='https://sqladmin.googleapis.com/sql/v1beta4/projects/kissflow-prod/instances/kissflowdb/connectSettings' 之后,我启用了 Cloud SQL Admin API 并添加了