minio_manage.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from minio import Minio
  2. from minio.error import S3Error
  3. import json
  4. import io
  5. class MinIO_globalCfg:
  6. endpoint = '35.tcp.cpolar.top:13040'
  7. #endpoint = "117.72.64.146:6614"
  8. access_key = "dongri"
  9. secret_key = "12345678"
  10. bucket_name = "dongri"
  11. secure = False
  12. class MinIOConfigManager:
  13. def __init__(self, endpoint=MinIO_globalCfg.endpoint,
  14. access_key=MinIO_globalCfg.access_key,
  15. secret_key=MinIO_globalCfg.secret_key, bucket_name=MinIO_globalCfg.bucket_name):
  16. self.client = Minio(
  17. endpoint,
  18. access_key=access_key,
  19. secret_key=secret_key,
  20. secure=False # 根据实际环境调整 HTTPS
  21. )
  22. self.bucket_name = bucket_name
  23. self._ensure_bucket_exists()
  24. def _ensure_bucket_exists(self):
  25. """确保存储桶存在"""
  26. try:
  27. if not self.client.bucket_exists(self.bucket_name):
  28. self.client.make_bucket(self.bucket_name)
  29. except S3Error as e:
  30. raise Exception(f"Bucket operation failed: {e}")
  31. def set_config(self, key, value):
  32. try:
  33. # 显式展示 value 的处理流程
  34. data_bytes = json.dumps(value).encode('utf-8') # 转为字节
  35. # 创建文件流对象
  36. stream = io.BytesIO(data_bytes)
  37. stream.seek(0) # 重置指针位置[6](@ref)
  38. # 上传到 MinIO(value_bytes 即处理后的 value)
  39. self.client.put_object(
  40. bucket_name=self.bucket_name,
  41. object_name=key,
  42. data=stream, # 最终使用 value
  43. length=len(data_bytes)
  44. )
  45. print(f"Config '{key}' saved.")
  46. except Exception as e:
  47. print(f"Error: {e}")
  48. def get_config(self, key):
  49. try:
  50. response = self.client.get_object(self.bucket_name, key)
  51. data = response.read().decode('utf-8') # 正确调用 read() 方法
  52. return data
  53. except S3Error as e:
  54. if e.code == "NoSuchKey":
  55. print(f"Config '{key}' not found.")
  56. return None
  57. print(f"Error fetching config: {e}")
  58. return None
  59. finally:
  60. if 'response' in locals():
  61. response.close()
  62. def list_configs(self):
  63. """列出所有配置键"""
  64. try:
  65. objects = self.client.list_objects(self.bucket_name)
  66. return [obj.object_name for obj in objects]
  67. except S3Error as e:
  68. print(f"Error listing configs: {e}")
  69. return []
  70. def delete_config(self, key):
  71. """删除配置"""
  72. try:
  73. self.client.remove_object(self.bucket_name, key)
  74. print(f"Config '{key}' deleted.")
  75. except S3Error as e:
  76. print(f"Error deleting config: {e}")
  77. def testMinIO():
  78. client = Minio("117.72.64.146:6614", access_key="dongri", secret_key="12345678", secure=False)
  79. # 创建桶并上传文件
  80. value = {"key": "value"} # 可以是任意数据
  81. data_bytes = json.dumps(value).encode('utf-8') # 转为字节
  82. # 创建文件流对象
  83. stream = io.BytesIO(data_bytes)
  84. stream.seek(0) # 重置指针位置[6](@ref)
  85. client.put_object(
  86. bucket_name="test-bucket",
  87. object_name="tesss111112st",
  88. data=stream,
  89. length=len(data_bytes),
  90. )
  91. print("功能验证成功")
  92. if __name__ == "__main__":
  93. #testMinIO()
  94. # 初始化客户端(替换为你的 MinIO 地址和密钥)
  95. config_manager = MinIOConfigManager(
  96. endpoint=MinIO_globalCfg.endpoint,
  97. access_key=MinIO_globalCfg.access_key,
  98. secret_key=MinIO_globalCfg.secret_key,
  99. bucket_name=MinIO_globalCfg.bucket_name
  100. )
  101. # 设置配置
  102. config_manager.set_config("database", {"host": "127.0.0.1", "port": 3306})
  103. config_manager.set_config("app_settings", {"theme": "dark", "timeout": 30})
  104. # 获取配置
  105. db_config = config_manager.get_config("database")
  106. print("Database config:", db_config) # 输出: {'host': '127.0.0.1', 'port': 3306}
  107. # 列出所有配置键
  108. print("All config keys:", config_manager.list_configs()) # 输出: ['database', 'app_settings']
  109. # 删除配置
  110. config_manager.delete_config("app_settings")