添加sqlx事务,备注:如果只涉及到单次数据变更操作,可以不使用事务

This commit is contained in:
李运家 2024-09-27 15:11:45 +08:00
parent e9f80e1c83
commit 461edb0f25
6 changed files with 164 additions and 57 deletions

4
Cargo.lock generated
View File

@ -2900,9 +2900,9 @@ checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291"
[[package]]
name = "unicode-segmentation"
version = "1.11.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
[[package]]
name = "unicode_categories"

View File

@ -1,8 +1,8 @@
use std::fmt::Display;
use chrono::{DateTime, Utc};
use sqlx::{Error, PgPool};
use sqlx::types::{chrono, JsonValue};
use sqlx::{Error, PgPool, Postgres, Transaction};
#[derive(Debug, PartialEq, Clone)]
pub enum Role {
@ -81,15 +81,18 @@ pub struct Account {
}
impl Account {
pub async fn find_by_apple_id(apple_id: &str, db_pool: &PgPool) -> Result<Option<Account>, Error> {
pub async fn find_by_apple_id(
apple_id: &str,
db_pool: &PgPool,
) -> Result<Option<Account>, Error> {
match sqlx::query_as!(
Account,
r#"select * from account where apple_id = $1"#,
apple_id
)
.fetch_one(db_pool)
.await {
.await
{
Ok(account) => {
return Ok(Some(account));
}
@ -103,14 +106,18 @@ impl Account {
}
}
pub async fn find_by_custom_id(custom_id: &str, db_pool: &PgPool) -> Result<Option<Account>, Error> {
pub async fn find_by_custom_id(
custom_id: &str,
db_pool: &PgPool,
) -> Result<Option<Account>, Error> {
match sqlx::query_as!(
Account,
r#"select * from account where custom_id = $1"#,
custom_id
)
.fetch_one(db_pool)
.await {
.await
{
Ok(account) => {
return Ok(Some(account));
}
@ -124,14 +131,18 @@ impl Account {
}
}
pub async fn find_by_facebook_id(facebook_id: &str, db_pool: &PgPool) -> Result<Option<Account>, Error> {
pub async fn find_by_facebook_id(
facebook_id: &str,
db_pool: &PgPool,
) -> Result<Option<Account>, Error> {
match sqlx::query_as!(
Account,
r#"select * from account where facebook_id = $1"#,
facebook_id
)
.fetch_one(db_pool)
.await {
.await
{
Ok(account) => {
return Ok(Some(account));
}
@ -144,14 +155,19 @@ impl Account {
}
}
}
pub async fn find_by_google_id(google_id: &str, db_pool: &PgPool) -> Result<Option<Account>, Error> {
pub async fn find_by_google_id(
google_id: &str,
db_pool: &PgPool,
) -> Result<Option<Account>, Error> {
match sqlx::query_as!(
Account,
r#"select * from account where google_id = $1"#,
google_id
)
.fetch_one(db_pool)
.await {
.await
{
Ok(account) => {
return Ok(Some(account));
}
@ -165,7 +181,10 @@ impl Account {
}
}
pub async fn save_google_account(&self, db_pool: &PgPool) -> Result<Account, Error> {
pub async fn save_google_account(
&self,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<Account, Error> {
sqlx::query_as!(
Account,
r#"
@ -179,10 +198,16 @@ impl Account {
self.email.clone().unwrap(),
self.display_name.clone().unwrap(),
self.avatar_url.clone().unwrap()
).fetch_one(db_pool).await
)
.fetch_one(&mut **transaction)
.await
}
pub async fn find_with_password(username: String, password: String, db_pool: &PgPool) -> Result<Option<Account>, Error> {
pub async fn find_with_password(
username: String,
password: String,
db_pool: &PgPool,
) -> Result<Option<Account>, Error> {
match sqlx::query_as!(
Account,
r#"
@ -190,7 +215,10 @@ impl Account {
"#,
username,
password.as_bytes().to_vec(),
).fetch_one(db_pool).await {
)
.fetch_one(db_pool)
.await
{
Ok(account) => {
return Ok(Some(account));
}
@ -204,7 +232,10 @@ impl Account {
}
}
pub async fn add_account(&self, db_pool: &PgPool) -> Result<Option<Account>, Error> {
pub async fn add_account(
&self,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<Option<Account>, Error> {
match sqlx::query_as!(
Account,
r#"
@ -219,7 +250,10 @@ impl Account {
self.role.to_string(),
self.wallet,
self.metadata
).fetch_one(db_pool).await {
)
.fetch_one(&mut **transaction)
.await
{
Ok(account) => {
return Ok(Some(account));
}
@ -232,5 +266,4 @@ impl Account {
}
}
}
}

View File

@ -1,8 +1,8 @@
use crate::db_result::CountResult;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use sqlx::types::chrono;
use crate::db_result::CountResult;
use sqlx::{PgPool, Postgres, Transaction};
#[derive(Debug, Clone, Deserialize, Default, Serialize)]
pub struct Feedback {
@ -13,22 +13,31 @@ pub struct Feedback {
}
impl Feedback {
pub async fn search_feedback(page: i64, page_size: i64, db_pool: &PgPool) -> Result<Vec<Feedback>, sqlx::Error> {
pub async fn search_feedback(
page: i64,
page_size: i64,
db_pool: &PgPool,
) -> Result<Vec<Feedback>, sqlx::Error> {
sqlx::query_as!(
Feedback,
r#"select id, user_id, content, created_at from feedback limit $1 offset $2"#,
page_size, page
).fetch_all(db_pool).await
page_size,
page
)
.fetch_all(db_pool)
.await
}
pub async fn count_feedback(db_pool: &PgPool) -> Result<CountResult, sqlx::Error> {
sqlx::query_as!(
CountResult,
r#"select count(*) from feedback"#,
).fetch_one(db_pool).await
sqlx::query_as!(CountResult, r#"select count(*) from feedback"#,)
.fetch_one(db_pool)
.await
}
pub async fn add_feedback(feedback: &mut Feedback, db_pool: &PgPool) -> Result<Feedback, sqlx::Error> {
pub async fn add_feedback(
feedback: &mut Feedback,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<Feedback, sqlx::Error> {
feedback.created_at = chrono::Local::now().naive_local();
sqlx::query_as!(
Feedback,
@ -40,6 +49,32 @@ impl Feedback {
feedback.user_id,
feedback.content,
feedback.created_at
).fetch_one(db_pool).await
)
.fetch_one(&mut **transaction)
.await
}
// pub async fn add_feedback<'e, 'a, E>(
// feedback: &mut Feedback,
// db_pool: &PgPool,
// ) -> Result<Feedback, sqlx::Error>
// where
// E: 'e + Executor<'a, Database = Postgres>,
// {
// let mut transaction = db_pool.begin().await?;
// feedback.created_at = chrono::Local::now().naive_local();
// sqlx::query_as!(
// Feedback,
// r#"insert into feedback
// (user_id, content, created_at)
// values
// ($1, $2, $3) returning *
// "#,
// feedback.user_id,
// feedback.content,
// feedback.created_at
// )
// .fetch_one(&mut *transaction)
// .await
// }
}

View File

@ -26,11 +26,12 @@ pub async fn authenticate_google(
ErrPerm(None)
})?;
let mut transaction = db!().begin().await?;
let account = Account::find_by_google_id(&verify_result.aud, db!()).await?;
let account = match account {
None => {
tracing::info!("账户不存在, {:?}", verify_result);
Account::save_google_account(
match Account::save_google_account(
&Account {
username: verify_result.name,
google_id: Some(verify_result.aud),
@ -39,16 +40,29 @@ pub async fn authenticate_google(
avatar_url: Some(verify_result.picture),
..Default::default()
},
db!(),
&mut transaction,
)
.await?
.await {
Ok(account) => {
transaction.commit().await?;
account
},
Err(err) => {
transaction.rollback().await?;
tracing::error!(error = ?err, "保存Google用户失败");
return Err(ResErr::service("保存Google用户失败"));
}
}
}
Some(account) => {
tracing::info!("账户已存在, {:?}", account);
if let Some(disable_time) = account.disable_time {
if disable_time > Utc::now() {
tracing::error!("账户已禁用");
return Err(ResErr::service(message!(context.get_lang_tag(), MessageId::AccountDisabled)));
return Err(ResErr::service(message!(
context.get_lang_tag(),
MessageId::AccountDisabled
)));
}
}
@ -89,7 +103,10 @@ pub async fn refresh_token(
let account = context.account.clone();
if token::verify_refresh_token(&refresh_token).is_err() {
return Err(ResErr::params(message!(context.get_lang_tag(), MessageId::InvalidToken)));
return Err(ResErr::params(message!(
context.get_lang_tag(),
MessageId::InvalidToken
)));
}
let refresh_token = RefreshTokenResult {

View File

@ -9,7 +9,7 @@ use library::model::response::ResResult;
pub async fn get_feedback_list_by_page(
context: Context,
page: i64,
page_size: i64
page_size: i64,
) -> ResResult<FeedbackPageable> {
if !context.account.role.is_admin() {
tracing::error!("非管理员用户,无法获取反馈信息列表");
@ -21,7 +21,12 @@ pub async fn get_feedback_list_by_page(
return Ok(FeedbackPageable::empty(page, page_size));
}
let total = get_feedback_count().await;
Ok(FeedbackPageable::new(feedback_list.unwrap(), total, page, page_size))
Ok(FeedbackPageable::new(
feedback_list.unwrap(),
total,
page,
page_size,
))
}
/// 获取反馈信息总数
@ -35,20 +40,25 @@ async fn get_feedback_count() -> i64 {
}
/// 添加反馈信息
pub async fn add_feedback(
context: Context,
req: FeedbackAdd
) -> ResResult<()> {
pub async fn add_feedback(context: Context, req: FeedbackAdd) -> ResResult<()> {
let account = context.account;
match Feedback::add_feedback(&mut Feedback{
let mut transaction = db!().begin().await?;
match Feedback::add_feedback(
&mut Feedback {
user_id: account.id.clone(),
content: req.content.unwrap(),
..Default::default()
}, db!()).await {
},
&mut transaction,
)
.await
{
Ok(feedback) => {
transaction.commit().await?;
tracing::info!("添加反馈成功: {:?}", feedback)
}
Err(err) => {
transaction.rollback().await?;
tracing::error!(error = ?err, "添加反馈信息失败");
return Err(library::model::response::ResErr::ErrService(None));
}

View File

@ -74,16 +74,28 @@ pub async fn authenticate_with_password(
/// 仅仅用于管理员维护,不对外暴露接口
pub async fn add_account() -> ResResult<()> {
let account = Account::add_account(
let mut transaction = db!().begin().await?;
let account = match Account::add_account(
&Account {
username: "admin".to_string(),
password: Some("123456".as_bytes().to_vec()),
role: Role::Admin,
..Default::default()
},
db!(),
&mut transaction,
)
.await?;
.await {
Ok(account) => {
transaction.commit().await?;
account
},
Err(err) => {
transaction.rollback().await?;
tracing::error!(error = ?err, "添加用户失败");
return Err(ResErr::service("添加用户失败"));
}
};
if account.is_none() {
tracing::error!("添加用户失败");
return Err(ResErr::service("添加用户失败"));