本文将详细介绍如何使用Python开发一个完整的配送管理软件。该系统将包含用户管理、订单处理、配送员调度和实时追踪等核心功能。我们将采用面向对象的设计方法,确保代码的可扩展性和可维护性。

系统架构设计
1.核心模块划分
“`python
“””
配送管理系统-主程序结构
“””
importdatetime

importuuid
importjson
fromabcimportABC,abstractmethod
fromtypingimportList,Dict,Optional,Tuple
fromdataclassesimportdataclass,asdict
fromenumimportEnum
数据模型定义
classOrderStatus(Enum):
PENDING=”待接单”

ACCEPTED=”已接单”
PICKED_UP=”已取货”
DELIVERING=”配送中”
DELIVERED=”已送达”
CANCELLED=”已取消”
classUserType(Enum):

CUSTOMER=”客户”
DELIVERY_PERSON=”配送员”
ADMIN=”管理员”
@dataclass
classUser:
user_id:str
username:str
password:str
user_type:UserType
phone:str
email:str
registration_date:datetime.datetime
@dataclass
classOrder:
order_id:str
customer_id:str
delivery_person_id:Optional[str]
pickup_address:str
delivery_address:str
items:List[Dict]
total_amount:float
status:OrderStatus
created_at:datetime.datetime
estimated_delivery_time:datetime.datetime
actual_delivery_time:Optional[datetime.datetime]
“`
2.核心类实现
“`python
classDeliverySystem:
“””配送系统主类”””
def__init__(self):
self.users:Dict[str,User]={}
self.orders:Dict[str,Order]={}
self.active_delivery_persons:Dict[str,Dict]={}
self.load_data()
defregister_user(self,username:str,password:str,
user_type:UserType,phone:str,email:str)->str:
“””用户注册”””
ifany(u.username==usernameforuinself.users.values()):
raiseValueError(“用户名已存在”)
user_id=str(uuid.uuid4())[:8]
user=User(
user_id=user_id,
username=username,
password=password,实际应用中应该加密存储
user_type=user_type,
phone=phone,
email=email,
registration_date=datetime.datetime.now()
)
self.users[user_id]=user
self.save_data()
returnuser_id
defcreate_order(self,customer_id:str,pickup_address:str,
delivery_address:str,items:List[Dict])->str:
“””创建订单”””
ifcustomer_idnotinself.users:
raiseValueError(“用户不存在”)
order_id=f”ORD{datetime.datetime.now().strftime(‘%Y%m%d%H%M%S’)}”
total_amount=sum(item[‘price’]item[‘quantity’]foriteminitems)
order=Order(
order_id=order_id,
customer_id=customer_id,
delivery_person_id=None,
pickup_address=pickup_address,
delivery_address=delivery_address,
items=items,
total_amount=total_amount,
status=OrderStatus.PENDING,
created_at=datetime.datetime.now(),
estimated_delivery_time=datetime.datetime.now()+
datetime.timedelta(hours=2),
actual_delivery_time=None
)
self.orders[order_id]=order
self.assign_delivery_person(order_id)
self.save_data()
returnorder_id
defassign_delivery_person(self,order_id:str)->Optional[str]:
“””自动分配配送员”””
order=self.orders.get(order_id)
ifnotorderororder.status!=OrderStatus.PENDING:
returnNone
查找可用的配送员(基于评分和距离)
available_persons=[
user_idforuser_id,userinself.users.items()
ifuser.user_type==UserType.DELIVERY_PERSON
anduser_idinself.active_delivery_persons
]
ifavailable_persons:
简单分配策略:选择第一个可用配送员
assigned_person=available_persons[0]
order.delivery_person_id=assigned_person
order.status=OrderStatus.ACCEPTED
更新配送员状态
self.active_delivery_persons[assigned_person][‘current_order’]=order_id
self.save_data()
returnassigned_person
returnNone
defupdate_order_status(self,order_id:str,
status:OrderStatus,
delivery_person_id:Optional[str]=None)->bool:
“””更新订单状态”””
order=self.orders.get(order_id)
ifnotorder:
returnFalse
验证配送员权限
if(delivery_person_idand
order.delivery_person_id!=delivery_person_id):
returnFalse
order.status=status
ifstatus==OrderStatus.DELIVERED:
order.actual_delivery_time=datetime.datetime.now()
self.save_data()
returnTrue
defcalculate_distance(self,address1:str,address2:str)->float:
“””计算地址间距离(简化版)”””
实际应用中应调用地图API
return5.0假设固定距离
defget_available_orders(self)->List[Order]:
“””获取待接单的订单”””
return[
orderfororderinself.orders.values()
iforder.status==OrderStatus.PENDING
]
defsave_data(self):
“””保存数据到文件”””
data={
‘users’:{uid:asdict(user)foruid,userinself.users.items()},
‘orders’:{oid:asdict(order)foroid,orderinself.orders.items()}
}
foruser_dataindata[‘users’].values():
user_data[‘user_type’]=user_data[‘user_type’].value
user_data[‘registration_date’]=user_data[‘registration_date’].isoformat()
fororder_dataindata[‘orders’].values():
order_data[‘status’]=order_data[‘status’].value
order_data[‘created_at’]=order_data[‘created_at’].isoformat()
order_data[‘estimated_delivery_time’]=order_data[‘estimated_delivery_time’].isoformat()
iforder_data[‘actual_delivery_time’]:
order_data[‘actual_delivery_time’]=order_data[‘actual_delivery_time’].isoformat()
withopen(‘delivery_data.json’,’w’,encoding=’utf-8′)asf:
json.dump(data,f,indent=2,ensure_ascii=False)
defload_data(self):
“””从文件加载数据”””
try:
withopen(‘delivery_data.json’,’r’,encoding=’utf-8′)asf:
data=json.load(f)
加载用户数据
foruid,user_dataindata.get(‘users’,{}).items():
user=User(
user_id=uid,
username=user_data[‘username’],
password=user_data[‘password’],
user_type=UserType(user_data[‘user_type’]),
phone=user_data[‘phone’],
email=user_data[’email’],
registration_date=datetime.datetime.fromisoformat(
user_data[‘registration_date’]
)
)
self.users[uid]=user
加载订单数据
foroid,order_dataindata.get(‘orders’,{}).items():
order=Order(
order_id=oid,
customer_id=order_data[‘customer_id’],
delivery_person_id=order_data[‘delivery_person_id’],
pickup_address=order_data[‘pickup_address’],
delivery_address=order_data[‘delivery_address’],
items=order_data[‘items’],
total_amount=order_data[‘total_amount’],
status=OrderStatus(order_data[‘status’]),
created_at=datetime.datetime.fromisoformat(
order_data[‘created_at’]
),
estimated_delivery_time=datetime.datetime.fromisoformat(
order_data[‘estimated_delivery_time’]
),
actual_delivery_time=(
datetime.datetime.fromisoformat(
order_data[‘actual_delivery_time’]
)iforder_data[‘actual_delivery_time’]elseNone
)
)
self.orders[oid]=order
exceptFileNotFoundError:
首次运行,无数据文件
pass
“`
3.WebAPI接口(使用Flask)
“`python
fromflaskimportFlask,request,jsonify,render_template
app=Flask(__name__)
delivery_system=DeliverySystem()
@app.route(‘/’)
defindex():
“””主页”””
returnrender_template(‘index.html’)
@app.route(‘/api/register’,methods=[‘POST’])
defregister():
“””注册API”””
data=request.json
try:
user_id=delivery_system.register_user(
username=data[‘username’],
password=data[‘password’],
user_type=UserType(data[‘user_type’]),
phone=data[‘phone’],
email=data[’email’]
)
returnjsonify({‘success’:True,’user_id’:user_id})
exceptExceptionase:
returnjsonify({‘success’:False,’error’:str(e)})
@app.route(‘/api/create_order’,methods=[‘POST’])
defcreate_order():
“””创建订单API”””
data=request.json
try:
order_id=delivery_system.create_order(
customer_id=data[‘customer_id’],
pickup_address=data[‘pickup_address’],
delivery_address=data[‘delivery_address’],
items=data[‘items’]
)
returnjsonify({‘success’:True,’order_id’:order_id})
exceptExceptionase:
returnjsonify({‘success’:False,’error’:str(e)})
@app.route(‘/api/track_order/
deftrack_order(order_id):
“””订单追踪API”””
order=delivery_system.orders.get(order_id)
ifnotorder:
returnjsonify({‘success’:False,’error’:’订单不存在’})
returnjsonify({
‘success’:True,
‘order’:asdict(order),
‘estimated_time’:order.estimated_delivery_time.isoformat()
})
@app.route(‘/api/update_location’,methods=[‘POST’])
defupdate_location():
“””更新配送员位置”””
data=request.json
delivery_person_id=data[‘delivery_person_id’]
latitude=data[‘latitude’]
longitude=data[‘longitude’]
更新配送员位置
ifdelivery_person_idindelivery_system.active_delivery_persons:
delivery_system.active_delivery_persons[delivery_person_id].update({
‘latitude’:latitude,
‘longitude’:longitude,
‘last_update’:datetime.datetime.now().isoformat()
})
returnjsonify({‘success’:True})
if__name__==’__main__’:
app.run(debug=True,port=5000)
“`
4.前端界面示例(HTML/CSS/JS)
“`html
智能配送管理系统
实时追踪,高效配送
