Python SDK
目前只支持python 3.6及以上的版本。
1. 创建客户端
可以创建同步客户端或者异步客户端。
- 同步客户端
 
from pyproximabe import *
client = Client('127.0.0.1', 16000)
Proxima BE服务支持 GRPC 和 HTTP 两种协议,监听在不同的端口上。同步客户端同时支持这两种协议,默认使用GRPC协议。如果有需求,可以用下面的方式来指定协议。
client = Client('127.0.0.1', 16001, 'http')
- 异步客户端
 
from pyproximabe import *
client = AsyncClient(HOST, GRPC_PORT)
异步客户端基于asyncio实现,除了需要await结果外,和同步客户端的使用方式完全一致。 异步客户端只支持GRPC协议。
同步或异步客户端支持的参数如下
def __init__(self, host, port=16000, handler='grpc', timeout=10):
| 参数 | 说明 | 
|---|---|
| host | 服务器地址,str类型 | 
| port | 服务器端口,int类型 | 
| handler | 协议类型,str类型,一般不需要指定。 同步客户端支持grpc/http,异步客户端只支持grpc。  | 
| timeout | 超时时间,单位秒,float类型,默认为10。指定为None则不超时。 | 
使用完毕后,可以调用close()关闭客户端,其参数为空,返回值为None。
client.close()
2. 集合管理
2.1. 集合创建
创建直写集合
collection_name = 'iris'
index_column = IndexColumnParam('length', 4, IndexType.PROXIMA_GRAPH_INDEX)
collection_config = CollectionConfig(collection_name, [index_column], ['iris_type'])
status = client.create_collection(collection_config)
if not status.ok():
    # error handling
    logging.error('create collection failed, status=%s', status)
首先,创建一个或多个索引列
index_column = IndexColumnParam('length', 4, IndexType.PROXIMA_GRAPH_INDEX)
参数如下
def __init__(self,
             name,
             dimension,
             index_type=IndexType.PROXIMA_GRAPH_INDEX,
             data_type=DataType.VECTOR_FP32,
             extra_params=None):
| 参数 | 说明 | 
|---|---|
| name | 索引列名称,str类型。 | 
| dimension | 向量维度,int类型。 | 
| index_type | 索引类型,IndexType 类型,默认为PROXIMA_GRAPH_INDEX。 | 
| data_type | 数据类型,DataType 类型,默认为VECTOR_FP32。 | 
| extra_params | 扩展参数,dict类型,默认为空。 | 
然后,创建CollectionConfig。
collection_config = CollectionConfig(collection_name, [index_column], ['iris_type'])
(python_collection_config)= 参数如下
def __init__(self,
                collection_name,
                index_column_params,
                forward_column_names=None,
                repository_config=None,
                max_docs_per_segment=0):
| 参数 | 说明 | 
|---|---|
| collection_name | 集合名称,str类型。 | 
| index_column_params | 索引列列表,List[IndexColumnParam]类型。 | 
| forward_column_names | 正排列列表,List[str]类型。默认为空。 | 
| repository_config | 仓库配置,DatabaseRepository类型。默认为None。 | 
| max_docs_per_segment | 每个segment的最大文档数,long类型。默认为0,文档数不限。 | 
最后,调用client接口。
status = client.create_collection(collection_config)
if not status.ok():
    # error handling
    logging.error('create collection failed, status=%s', status)
create_collection()接收CollectionConfig参数,返回ProximaBeStatus。
def create_collection(self, collection_config):
创建数据库旁路集合
MYSQL_PORT = 3306
MYSQL_HOST = HOST
mysql_table_name = 'iris_table'
mysql_database_name = 'test_db'
mysql_user='root'
mysql_password='root'
index_column = IndexColumnParam('length', 4, IndexType.PROXIMA_GRAPH_INDEX)
database_repository = DatabaseRepository('test_repository',
                                         f'mysql://{MYSQL_HOST}:{MYSQL_PORT}/{mysql_database_name}',
                                         mysql_table_name, mysql_user, mysql_password)
collection_config = CollectionConfig(collection_name, [index_column], ['iris_type'], database_repository)
status = client.create_collection(collection_config)
除了需要创建DatabaseRepository外,其他参数和直写集合完全相同。目前只支持mysql。
database_repository = DatabaseRepository('test_repository',
                                         f'mysql://{MYSQL_HOST}:{MYSQL_PORT}/{mysql_database_name}',
                                         mysql_table_name, mysql_user, mysql_password)
参数列表如下
def __init__(self, repository_name, connection_uri, table_name, user, password):
| 参数 | 说明 | 
|---|---|
| repository_name | 仓库名称,str类型。 | 
| connection_uri | 数据库连接串,str类型,如"mysql://localhost/database",不包含用户名和密码。 | 
| table_name | 表名,str类型。 | 
| user | 用户名,str类型。 | 
| password | 数据库密码,str类型。 | 
2.2. 描述集合
status, collection_info = client.describe_collection(collection_name)
if not status.ok():
    pass  # error handling
print(collection_info)
describe_collection()接收str类型的collection_name
def describe_collection(self, collection_name):
返回值有两个
2.3. 获取集合统计信息
status, collection_stats = client.stats_collection(collection_name)
if not status.ok():
    pass  # error handling
print(collection_stats)
stats_collection()接收str类型的collection_name
def stats_collection(self, collection_name):
返回值有两个
2.4. 删除集合
status = client.drop_collection(collection_name)
drop_collection()接收str类型的collection_name,返回ProximaBeStatus。
def drop_collection(self, collection_name):
2.5. 获取集合列表
status, collections_data = client.list_collections()
list_collections()参数列表如下
def list_collections(self, repository_name=None):
| 参数 | 说明 | 
|---|---|
| repository_name | 仓库名称,str类型,默认为空。参数为空时,返回所有的集合列表,否则返回指定仓库下的集合列表。 | 
| 返回值有两个 | 
3. 文档管理
3.1. 通用接口
import struct
row_meta = WriteRequest.RowMeta([WriteRequest.IndexColumnMeta('length', 'VECTOR_FP32', 4)],
                                ['iris_type'],
                                [DataType.STRING])
rows = [
    WriteRequest.Row(100001,
                     WriteRequest.OperationType.INSERT,
                     [[5.9,3.0,5.1,1.8]],
                     ['Iris-virginica']),
    WriteRequest.Row(100002,
                     WriteRequest.OperationType.INSERT,
                     ['[5.9,3.0,5.1,1.8]'],
                     ['Iris-virginica']),
    WriteRequest.Row(10002,
                     WriteRequest.OperationType.UPDATE,
                     [struct.pack('<4f', 5.9, 3.0, 5.1, 1.8)],
                     ['Iris-virginica']),
    WriteRequest.Row(10003, WriteRequest.OperationType.DELETE),
]
write_request = WriteRequest(collection_name, rows, row_meta)
status = client.write(write_request)
首先,创建WriteRequest.RowMeta,描述插入行的数据类型等信息。
row_meta = WriteRequest.RowMeta([WriteRequest.IndexColumnMeta('length', 'VECTOR_FP32', 4)],
                                ['iris_type'],
                                [DataType.STRING])
WriteRequest.IndexColumnMeta的参数如下
def __init__(self, name, data_type, dimension):
| 参数 | 说明 | 
|---|---|
| name | 索引列名称,str类型。 | 
| data_type | 数据类型,DataType 类型。 | 
| dimension | 向量维度,int类型。 | 
WriteRequest.RowMeta的参数如下 | 
def __init__(self,
             index_column_metas,
             forward_column_names=None,
             forward_column_types=None):
| 参数 | 说明 | 
|---|---|
| index_column_metas | 索引列列表,List[WriteRequest.IndexColumnMeta]类型。 | 
| forward_column_names | 正排列名称列表,List[str]。默认为空。 | 
| forward_column_types | 正排列类型列表,List[DataType]。默认为空。 | 
然后创建WriteRequest.Row。
    WriteRequest.Row(100001,
                     WriteRequest.OperationType.INSERT,
                     [[5.9,3.0,5.1,1.8]],
                     ['Iris-virginica']),
WriteRequest.Row的参数如下
 def __init__(self,
              primary_key,
              operation_type,
              index_column_values=None,
              forward_column_values=None,
              lsn_context=None):
| 参数 | 说明 | 
|---|---|
| primary_key | 主键,long类型。 | 
| operation_type | 操作类型, WriteRequest.OperationType 类型。 | 
| index_column_values | 索引列的值列表,list类型,operation_type为删除时不需要指定。支持三种类型
  | 
| forward_column_values | 正排列的值列表,list类型。 | 
| lsn_context | 日志序列号上下文,LsnContext类型,默认为空。一般不需要设置。 | 
然后创建WriteRequest | 
write_request = WriteRequest(collection_name, rows, row_meta)
WriteRequest参数如下
def __init__(self,
             collection_name,
             rows,
             row_meta=None,
             request_id=None,
             magic_number=None):
| 参数 | 说明 | 
|---|---|
| collection_name | 集合名称,str类型。 | 
| rows | 文档列表,List[WriteRequest.Row]类型。 | 
| row_meta | 文档元数据,WriteRequest.RowMeta类型。如果所有的operation_type都为DELETE,不需要指定。 | 
| request_id | 请求id,str类型,默认为空。一般不需要设置。 | 
| magic_number | 服务端魔数,long类型,默认为0。一般不需要设置。 | 
最后调用client接口。
status = client.write(write_request)
write()接收WriteRequest参数,返回ProximaBeStatus。
def write(self, write_request):
3.2. 删除接口
status = client.delete_document_by_keys(collection_name, [10001, 10002])
在只需要删除时,python sdk提供了简化接口,只需要指定集合名称和主键列表。 参数如下
def delete_document_by_keys(self, collection_name, primary_keys):
| 参数 | 说明 | 
|---|---|
| collection_name | 集合名称,str类型。 | 
| primary_keys | 主键列表,List[long]类型。 | 
delete_document_by_keys()返回值为ProximaBeStatus。 | 
4. 文档查询
4.1. 向量查询
status, knn_res = client.query(collection_name,
                               'length',
                               [[5.1, 3.5, 1.4, 0.2],
                                         [5.5, 2.3, 4.0, 1.3]],
                               'VECTOR_FP32',
                               topk=2)
for i, result in enumerate(knn_res.results):
    print(f'Query: {i}')
    for doc in result:
        forward_values = ','.join(
            f'{k}={v}' for k, v in doc.forward_column_values.items())
        print(
            f'    primary_key={doc.primary_key}, score={doc.score}, forward_column_values=[{forward_values}]'
        )
query()的参数如下
def query(self,
          collection_name,
          column_name,
          features,
          data_type=None,
          dimension=None,
          batch_count=None,
          topk=100,
          is_linear=False,
          radius=None,
          extra_params=None):
| 参数 | 说明 | 
|---|---|
| collection_name | 集合名称,str类型。 | 
| column_name | 索引列名称,str类型。 | 
| features | 特征,支持三种类型
  | 
| data_type | 数据类型,DataType 类型。默认为空。
  | 
| dimension | 向量维度,int类型。默认为空。
  | 
| batch_count | batch大小,int类型。默认为空。
  | 
| topk | 单条查询向量返回的结果数,int类型。默认为100。 | 
| is_linear | 是否做线性查找,bool类型。默认为False,基于索引做查找。 | 
| radius | 搜索半径,只返回以搜索向量为球心的球体内的向量,float类型。默认为0.0,搜索半径不限。 | 
| extra_params | 扩展参数,dict类型。 | 
| 返回值有两个 | 
4.2. 主键查询
status, res = client.get_document_by_key(collection_name, primary_key=100001)
get_document_by_key()的参数如下
def get_document_by_key(self, collection_name, primary_key):
| 参数 | 说明 | 
|---|---|
| collection_name | 集合名称,str类型。 | 
| primary_key | 主键,long类型。 | 
| 返回值有两个 | 
- ProximaBeStatus。主键不存在时,status.ok() == True.
 - Document。主键不存在时,返回None。
 
5. 错误处理
错误处理
python sdk接口一般会返回ProximaBeStatus类型,其包含两个属性
| 属性 | 说明 | 
|---|---|
| code | 错误码,int类型。 | 
| reason | 错误详情,str类型。 | 
可以通过ok()接口来判断是否成功。
def ok(self):
    return self.code == 0
:class: warning
请求服务端成功时才会返回ProximaBeStatus,如果客户端参数检查失败或者网络有问题,会抛出ProximaSeException异常。
class ProximaSeException(Exception):
    pass
6. 类型定义
IndexType
class IndexType(IntEnum):
    UNDEFINED = 0
    PROXIMA_GRAPH_INDEX = 1
DataType
class DataType(IntEnum):
    UNDEFINED = 0
    BINARY = 1
    STRING = 2
    BOOL = 3
    INT32 = 4
    INT64 = 5
    UINT32 = 6
    UINT64 = 7
    FLOAT = 8
    DOUBLE = 9
    VECTOR_BINARY32 = 20
    VECTOR_BINARY64 = 21
    VECTOR_FP16 = 22
    VECTOR_FP32 = 23
    VECTOR_FP64 = 24
    VECTOR_INT4 = 25
    VECTOR_INT8 = 26
    VECTOR_INT16 = 27
WriteRequest.OperationType
class OperationType(IntEnum):
    INSERT = 0
    UPDATE = 1
    DELETE = 2
Document
Document 包含以下属性
| 属性 | 说明 | 
|---|---|
| primary_key | 主键,long类型。 | 
| score | 分值,即距离查询向量的距离,float类型。 | 
| forward_column_values | 正排名称到值的映射,dict类型。 | 
QueryResponse
QueryResponse包含以下属性
| 属性 | 说明 | 
|---|---|
| results | 结果列表,List[List[Document]]类型。第一重list表示多个查询向量的结果,第二重list表示每个向量的Document列表 | 
| debug_info | 服务端调试信息,str类型。 | 
| latency_us | 服务端统计的耗时,单位为微秒。 | 
CollectionInfo
CollectionInfo包含以下属性
| 属性 | 说明 | 
|---|---|
| collection_config | 集合配置, CollectionConfig类型。 | 
| status | 集合状态,CollectionInfo.Status 类型。 | 
| uuid | 集合唯一标识,str类型。 | 
| latest_lsn_context | 最新日志序列号上下文, LsnContext 类型。 | 
| magic_number | 服务端魔数,long类型。 | 
{ref}CollectionConfig定义见这里<python_collection_config>
CollectionInfo.Status定义
class Status(IntEnum):
    """Collection Status"""
    INITIALIZED = 0
    SERVING = 1
    DROPPED = 2
CollectionStats
CollectionStats包含以下属性
| 属性 | 说明 | 
|---|---|
| collection_name | 集合名称,str类型。 | 
| collection_path | 集合路径,str类型。 | 
| total_doc_count | 文档总数,long类型。 | 
| total_segment_count | 分段总个数,long类型。 | 
| total_index_file_count | 索引文件总个数,long类型。 | 
| total_index_file_size | 索引文件总大小,long类型。 | 
| segment_stats | 分段统计信息,List[CollectionStats.SegmentStats]类型。 | 
CollectionStats.SegmentStats包含以下属性
| 属性 | 说明 | 
|---|---|
| segment_id | 段id,int类型。 | 
| state | 段状态,CollectionStats.SegmentState类型。 | 
| doc_count | 文档数,long类型。 | 
| index_file_count | 索引文件个数,long类型。 | 
| index_file_size | 索引文件大小,long类型。 | 
| min_doc_id | 当前分段的最小文档id,long类型。 | 
| max_doc_id | 当前分段的最大文档id,long类型。 | 
| min_primary_key | 当前分段的最小主键,long类型。 | 
| max_primary_key | 当前分段的最大主键,long类型。 | 
| min_timestamp | 当前分段的最小时间戳,long类型。 | 
| max_timestamp | 当前分段的最大时间戳,long类型。 | 
| min_lsn | 当前分段的最小日志序列号,long类型。 | 
| max_lsn | 当前分段的最大日志序列号,long类型。 | 
| segment_path | 段文件路径,str类型。 | 
CollectionStats.SegmentState定义如下
class SegmentState(IntEnum):
    CREATED = 0
    WRITING = 1
    DUMPING = 2
    COMPACTING = 3
    PERSIST = 4
LsnContext
LsnContext 包含以下属性
| 属性 | 说明 | 
|---|---|
| lsn | 序列号,long类型。 | 
| context | 上下文,str类型。 |