diff --git a/ch2driver/pytpcc/drivers/nestcollectionsdriver.py b/ch2driver/pytpcc/drivers/nestcollectionsdriver.py index 8d0ee4d..08468cb 100644 --- a/ch2driver/pytpcc/drivers/nestcollectionsdriver.py +++ b/ch2driver/pytpcc/drivers/nestcollectionsdriver.py @@ -55,13 +55,6 @@ from couchbase.options import ClusterOptions, ClusterTimeoutOptions from couchbase.auth import PasswordAuthenticator -QUERY_URL = "127.0.0.1:8093" -DATA_URL = "127.0.0.1" -ANALYTICS_URL = "127.0.0.1:8095" -NUM_LOAD_RETRIES = 10 - -USER_ID = "Administrator" -PASSWORD = "password" CH2_TXN_QUERIES = { "DELIVERY": { "beginWork": "BEGIN WORK", @@ -127,415 +120,61 @@ "beginWork": "BEGIN WORK", "rollbackWork":"ROLLBACK WORK", "commitWork":"COMMIT WORK", - "getNewOrder": "SELECT no_o_id FROM default:bench.ch2.neworder WHERE no_d_id = $1 AND no_w_id = $2 AND no_o_id > -1 LIMIT 1", # - "deleteNewOrder": "DELETE FROM default:bench.ch2.neworder WHERE no_d_id = $1 AND no_w_id = $2 AND no_o_id = $3", # d_id, w_id, no_o_id - "getCId": "SELECT o_c_id FROM default:bench.ch2.orders WHERE o_id = $1 AND o_d_id = $2 AND o_w_id = $3", # no_o_id, d_id, w_id - "updateOrders": "UPDATE default:bench.ch2.orders SET o_carrier_id = $1 WHERE o_id = $2 AND o_d_id = $3 AND o_w_id = $4", # o_carrier_id, no_o_id, d_id, w_id - "updateOrderLine": "UPDATE default:bench.ch2.orders SET ol.ol_delivery_d = $1 FOR ol IN o_orderline END WHERE o_id = $2 AND o_d_id = $3 AND o_w_id = $4", # o_entry_d, no_o_id, d_id, w_id - "sumOLAmount": "SELECT VALUE (SELECT SUM(ol.ol_amount) as sum_ol_amount FROM o.o_orderline ol)[0] FROM default:bench.ch2.orders o where o.o_id = $1 and o.o_d_id = $2 and o.o_w_id = $3", - "updateCustomer": "UPDATE default:bench.ch2.customer USE KEYS [(to_string($4) || '.' || to_string($3) || '.' || to_string($2))] SET c_balance = c_balance + $1 ", # ol_total, c_id, d_id, w_id + "getNewOrder": "SELECT no_o_id FROM default:bench.ch2pp.neworder WHERE no_d_id = $1 AND no_w_id = $2 AND no_o_id > -1 LIMIT 1", # + "deleteNewOrder": "DELETE FROM default:bench.ch2pp.neworder WHERE no_d_id = $1 AND no_w_id = $2 AND no_o_id = $3", # d_id, w_id, no_o_id + "getCId": "SELECT o_c_id FROM default:bench.ch2pp.orders WHERE o_id = $1 AND o_d_id = $2 AND o_w_id = $3", # no_o_id, d_id, w_id + "updateOrders": "UPDATE default:bench.ch2pp.orders SET o_carrier_id = $1 WHERE o_id = $2 AND o_d_id = $3 AND o_w_id = $4", # o_carrier_id, no_o_id, d_id, w_id + "updateOrderLine": "UPDATE default:bench.ch2pp.orders SET ol.ol_delivery_d = $1 FOR ol IN o_orderline END WHERE o_id = $2 AND o_d_id = $3 AND o_w_id = $4", # o_entry_d, no_o_id, d_id, w_id + "sumOLAmount": "SELECT VALUE (SELECT SUM(ol.ol_amount) as sum_ol_amount FROM o.o_orderline ol)[0] FROM default:bench.ch2pp.orders o where o.o_id = $1 and o.o_d_id = $2 and o.o_w_id = $3", + "updateCustomer": "UPDATE default:bench.ch2pp.customer USE KEYS [(to_string($4) || '.' || to_string($3) || '.' || to_string($2))] SET c_balance = c_balance + $1 ", # ol_total, c_id, d_id, w_id }, "NEW_ORDER": { "beginWork": "BEGIN WORK", "rollbackWork":"ROLLBACK WORK", "commitWork":"COMMIT WORK", - "getWarehouseTaxRate": "SELECT w_tax FROM default:bench.ch2.warehouse WHERE w_id = $1", # w_id - "getDistrict": "SELECT d_tax, d_next_o_id FROM default:bench.ch2.district WHERE d_id = $1 AND d_w_id = $2", # d_id, w_id - "incrementNextOrderId": "UPDATE default:bench.ch2.district SET d_next_o_id = $1 WHERE d_id = $2 AND d_w_id = $3", # d_next_o_id, d_id, w_id - "getCustomer": "SELECT c_discount, c_name.c_last, c_credit FROM default:bench.ch2.customer USE KEYS [(to_string($1) || '.' || to_string($2) || '.' || to_string($3))]", # w_id, d_id, c_id - "createOrder": "INSERT INTO default:bench.ch2.orders (KEY, VALUE) VALUES (TO_STRING($3) || '.' || TO_STRING($2) || '.' || TO_STRING($1), {\\\"o_id\\\":$1, \\\"o_d_id\\\":$2, \\\"o_w_id\\\":$3, \\\"o_c_id\\\":$4, \\\"o_entry_d\\\":$5, \\\"o_carrier_id\\\":$6, \\\"o_ol_cnt\\\":$7, \\\"o_all_local\\\":$8})", # d_next_o_id, d_id, w_id, c_id, o_entry_d, o_carrier_id, o_ol_cnt, o_all_local - "createNewOrder": "INSERT INTO default:bench.ch2.neworder(KEY, VALUE) VALUES(TO_STRING($2)|| '.' || TO_STRING($3)|| '.' || TO_STRING($1), {\\\"no_o_id\\\":$1,\\\"no_d_id\\\":$2,\\\"no_w_id\\\":$3})", - "getItemInfo": "SELECT i_price, i_name, i_data FROM default:bench.ch2.item USE KEYS [to_string($1)]", # ol_i_id - "getStockInfo": "SELECT s_quantity, s_data, s_ytd, s_order_cnt, s_remote_cnt, s_dists FROM default:bench.ch2.stock USE KEYS [TO_STRING($2)|| '.' || TO_STRING($1)]", # d_id, ol_i_id, ol_supply_w_id - "updateStock": "UPDATE default:bench.ch2.stock USE KEYS [to_string($6) || '.' || to_string($5)] SET s_quantity = $1, s_ytd = $2, s_order_cnt = $3, s_remote_cnt = $4 ", # s_quantity, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id - "createOrderLine": "UPSERT INTO default:bench.ch2.orders(KEY, VALUE) VALUES(TO_STRING($3)|| '.' || TO_STRING($2)|| '.' || TO_STRING($1), { \\\"o_id\\\":$1, \\\"o_d_id\\\":$2, \\\"o_w_id\\\":$3, \\\"o_orderline\\\": [{\\\"ol_number\\\":$4, \\\"ol_i_id\\\":$5, \\\"ol_supply_w_id\\\":$6, \\\"ol_delivery_d\\\":$7, \\\"ol_quantity\\\":$8, \\\"ol_amount\\\":$9, \\\"ol_dist_info\\\":$10}]})" + "getWarehouseTaxRate": "SELECT w_tax FROM default:bench.ch2pp.warehouse WHERE w_id = $1", # w_id + "getDistrict": "SELECT d_tax, d_next_o_id FROM default:bench.ch2pp.district WHERE d_id = $1 AND d_w_id = $2", # d_id, w_id + "incrementNextOrderId": "UPDATE default:bench.ch2pp.district SET d_next_o_id = $1 WHERE d_id = $2 AND d_w_id = $3", # d_next_o_id, d_id, w_id + "getCustomer": "SELECT c_discount, c_name.c_last, c_credit FROM default:bench.ch2pp.customer USE KEYS [(to_string($1) || '.' || to_string($2) || '.' || to_string($3))]", # w_id, d_id, c_id + "createOrder": "INSERT INTO default:bench.ch2pp.orders (KEY, VALUE) VALUES (TO_STRING($3) || '.' || TO_STRING($2) || '.' || TO_STRING($1), {\\\"o_id\\\":$1, \\\"o_d_id\\\":$2, \\\"o_w_id\\\":$3, \\\"o_c_id\\\":$4, \\\"o_entry_d\\\":$5, \\\"o_carrier_id\\\":$6, \\\"o_ol_cnt\\\":$7, \\\"o_all_local\\\":$8})", # d_next_o_id, d_id, w_id, c_id, o_entry_d, o_carrier_id, o_ol_cnt, o_all_local + "createNewOrder": "INSERT INTO default:bench.ch2pp.neworder(KEY, VALUE) VALUES(TO_STRING($2)|| '.' || TO_STRING($3)|| '.' || TO_STRING($1), {\\\"no_o_id\\\":$1,\\\"no_d_id\\\":$2,\\\"no_w_id\\\":$3})", + "getItemInfo": "SELECT i_price, i_name, i_data FROM default:bench.ch2pp.item USE KEYS [to_string($1)]", # ol_i_id + "getStockInfo": "SELECT s_quantity, s_data, s_ytd, s_order_cnt, s_remote_cnt, s_dists FROM default:bench.ch2pp.stock USE KEYS [TO_STRING($2)|| '.' || TO_STRING($1)]", # d_id, ol_i_id, ol_supply_w_id + "updateStock": "UPDATE default:bench.ch2pp.stock USE KEYS [to_string($6) || '.' || to_string($5)] SET s_quantity = $1, s_ytd = $2, s_order_cnt = $3, s_remote_cnt = $4 ", # s_quantity, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id + "createOrderLine": "UPSERT INTO default:bench.ch2pp.orders(KEY, VALUE) VALUES(TO_STRING($3)|| '.' || TO_STRING($2)|| '.' || TO_STRING($1), { \\\"o_id\\\":$1, \\\"o_d_id\\\":$2, \\\"o_w_id\\\":$3, \\\"o_orderline\\\": [{\\\"ol_number\\\":$4, \\\"ol_i_id\\\":$5, \\\"ol_supply_w_id\\\":$6, \\\"ol_delivery_d\\\":$7, \\\"ol_quantity\\\":$8, \\\"ol_amount\\\":$9, \\\"ol_dist_info\\\":$10}]})" }, "ORDER_STATUS": { "beginWork": "BEGIN WORK", "rollbackWork":"ROLLBACK WORK", "commitWork":"COMMIT WORK", - "getCustomerByCustomerId": "SELECT c_id, c_name.c_first, c_name.c_middle, c_name.c_last, c_balance FROM default:bench.ch2.customer USE KEYS [(to_string($1) || '.' || to_string($2) || '.' || to_string($3)) ]", # w_id, d_id, c_id - "getCustomersByLastName": "SELECT c_id, c_name.c_first, c_name.c_middle, c_name.c_last, c_balance FROM default:bench.ch2.customer WHERE c_w_id = $1 AND c_d_id = $2 AND c_name.c_last = $3 ORDER BY c_name.c_first", # w_id, d_id, c_last - "getLastOrder": "SELECT o_id, o_carrier_id, o_entry_d FROM default:bench.ch2.orders WHERE o_w_id = $1 AND o_d_id = $2 AND o_c_id = $3 ORDER BY o_id DESC LIMIT 1", # w_id, d_id, c_id - "getOrderLines": "SELECT ol.ol_supply_w_id, ol.ol_i_id, ol.ol_quantity, ol.ol_amount, ol.ol_delivery_d FROM default:bench.ch2.orders o unnest o.o_orderline ol WHERE o.o_w_id = $1 AND o.o_d_id = $2 AND o.o_id = $3", # w_id, d_id, o_id + "getCustomerByCustomerId": "SELECT c_id, c_name.c_first, c_name.c_middle, c_name.c_last, c_balance FROM default:bench.ch2pp.customer USE KEYS [(to_string($1) || '.' || to_string($2) || '.' || to_string($3)) ]", # w_id, d_id, c_id + "getCustomersByLastName": "SELECT c_id, c_name.c_first, c_name.c_middle, c_name.c_last, c_balance FROM default:bench.ch2pp.customer WHERE c_w_id = $1 AND c_d_id = $2 AND c_name.c_last = $3 ORDER BY c_name.c_first", # w_id, d_id, c_last + "getLastOrder": "SELECT o_id, o_carrier_id, o_entry_d FROM default:bench.ch2pp.orders WHERE o_w_id = $1 AND o_d_id = $2 AND o_c_id = $3 ORDER BY o_id DESC LIMIT 1", # w_id, d_id, c_id + "getOrderLines": "SELECT ol.ol_supply_w_id, ol.ol_i_id, ol.ol_quantity, ol.ol_amount, ol.ol_delivery_d FROM default:bench.ch2pp.orders o unnest o.o_orderline ol WHERE o.o_w_id = $1 AND o.o_d_id = $2 AND o.o_id = $3", # w_id, d_id, o_id }, "PAYMENT": { "beginWork": "BEGIN WORK", "rollbackWork":"ROLLBACK WORK", "commitWork":"COMMIT WORK", - "getWarehouse": "SELECT w_name, w_address.w_street_1, w_address.w_street_2, w_address.w_city, w_address.w_state, w_address.w_zip FROM default:bench.ch2.warehouse WHERE w_id = $1", # w_id - "updateWarehouseBalance": "UPDATE default:bench.ch2.warehouse SET w_ytd = w_ytd + $1 WHERE w_id = $2", # h_amount, w_id - "getDistrict": "SELECT d_name, d_address.d_street_1, d_address.d_street_2, d_address.d_city, d_address.d_state, d_address.d_zip FROM default:bench.ch2.district WHERE d_w_id = $1 AND d_id = $2", # w_id, d_id - "updateDistrictBalance": "UPDATE default:bench.ch2.district SET d_ytd = d_ytd + $1 WHERE d_w_id = $2 AND d_id = $3", # h_amount, d_w_id, d_id - "getCustomerByCustomerId": "SELECT c.c_id, c.c_name.c_first, c.c_name.c_middle, c.c_name.c_last, ca.c_street_1, ca.c_street_2, ca.c_city, ca.c_state, ca.c_zip, cp.c_phone_number, c.c_since, c.c_credit, c.c_credit_lim, c.c_discount, c.c_balance, c.c_ytd_payment, c.c_payment_cnt, c.c_data FROM default:bench.ch2.customer AS c USE KEYS [(to_string($1) || '.' || to_string($2) || '.' || to_string($3))] UNNEST c.c_addresses AS ca UNNEST c.c_phones AS cp WHERE ca.c_address_kind = 'shipping' AND cp.cp_phone_kind = 'mobile'", # w_id, d_id, c_id - "getCustomersByLastName": "SELECT c.c_id, c.c_name.c_first, c.c_name.c_middle, c.c_name.c_last, ca.c_street_1, ca.c_street_2, ca.c_city, ca.c_state, ca.c_zip, cp.c_phone_number, c.c_since, c.c_credit, c.c_credit_lim, c.c_discount, c.c_balance, c.c_ytd_payment, c.c_payment_cnt, c.c_data FROM default:bench.ch2.customer AS c UNNEST c.c_addresses AS ca UNNEST c.c_phones AS cp WHERE ca.c_address_kind = 'shipping' AND cp.cp_phone_kind = 'mobile' AND c.c_w_id = $1 AND c.c_d_id = $2 AND c.c_last = $3 ORDER BY c.c_first", # w_id, d_id, c_last - "updateBCCustomer": "UPDATE default:bench.ch2.customer USE KEYS [(to_string($6) || '.' || to_string($6) || '.' || to_string($7)) ] SET c_balance = $1, c_ytd_payment = $2, c_payment_cnt = $3, c_data = $4 ", # c_balance, c_ytd_payment, c_payment_cnt, c_data, c_w_id, c_d_id, c_id - "updateGCCustomer": "UPDATE default:bench.ch2.customer USE KEYS [(to_string($4) || '.' || to_string($5) || '.' || to_string($6)) ] SET c_balance = $1, c_ytd_payment = $2, c_payment_cnt = $3 ", # c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id - "insertHistory": "INSERT INTO default:bench.ch2.history(KEY, VALUE) VALUES (TO_STRING($6), {\\\"h_c_id\\\":$1, \\\"h_c_d_id\\\":$2, \\\"h_c_w_id\\\":$3, \\\"h_d_id\\\":$4, \\\"h_w_id\\\":$5, \\\"h_date\\\":$6, \\\"h_amount\\\":$7, \\\"h_data\\\":$8})" + "getWarehouse": "SELECT w_name, w_address.w_street_1, w_address.w_street_2, w_address.w_city, w_address.w_state, w_address.w_zip FROM default:bench.ch2pp.warehouse WHERE w_id = $1", # w_id + "updateWarehouseBalance": "UPDATE default:bench.ch2pp.warehouse SET w_ytd = w_ytd + $1 WHERE w_id = $2", # h_amount, w_id + "getDistrict": "SELECT d_name, d_address.d_street_1, d_address.d_street_2, d_address.d_city, d_address.d_state, d_address.d_zip FROM default:bench.ch2pp.district WHERE d_w_id = $1 AND d_id = $2", # w_id, d_id + "updateDistrictBalance": "UPDATE default:bench.ch2pp.district SET d_ytd = d_ytd + $1 WHERE d_w_id = $2 AND d_id = $3", # h_amount, d_w_id, d_id + "getCustomerByCustomerId": "SELECT c.c_id, c.c_name.c_first, c.c_name.c_middle, c.c_name.c_last, ca.c_street_1, ca.c_street_2, ca.c_city, ca.c_state, ca.c_zip, cp.c_phone_number, c.c_since, c.c_credit, c.c_credit_lim, c.c_discount, c.c_balance, c.c_ytd_payment, c.c_payment_cnt, c.c_data FROM default:bench.ch2pp.customer AS c USE KEYS [(to_string($1) || '.' || to_string($2) || '.' || to_string($3))] UNNEST c.c_addresses AS ca UNNEST c.c_phones AS cp WHERE ca.c_address_kind = 'shipping' AND cp.cp_phone_kind = 'mobile'", # w_id, d_id, c_id + "getCustomersByLastName": "SELECT c.c_id, c.c_name.c_first, c.c_name.c_middle, c.c_name.c_last, ca.c_street_1, ca.c_street_2, ca.c_city, ca.c_state, ca.c_zip, cp.c_phone_number, c.c_since, c.c_credit, c.c_credit_lim, c.c_discount, c.c_balance, c.c_ytd_payment, c.c_payment_cnt, c.c_data FROM default:bench.ch2pp.customer AS c UNNEST c.c_addresses AS ca UNNEST c.c_phones AS cp WHERE ca.c_address_kind = 'shipping' AND cp.cp_phone_kind = 'mobile' AND c.c_w_id = $1 AND c.c_d_id = $2 AND c.c_last = $3 ORDER BY c.c_first", # w_id, d_id, c_last + "updateBCCustomer": "UPDATE default:bench.ch2pp.customer USE KEYS [(to_string($6) || '.' || to_string($6) || '.' || to_string($7)) ] SET c_balance = $1, c_ytd_payment = $2, c_payment_cnt = $3, c_data = $4 ", # c_balance, c_ytd_payment, c_payment_cnt, c_data, c_w_id, c_d_id, c_id + "updateGCCustomer": "UPDATE default:bench.ch2pp.customer USE KEYS [(to_string($4) || '.' || to_string($5) || '.' || to_string($6)) ] SET c_balance = $1, c_ytd_payment = $2, c_payment_cnt = $3 ", # c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id + "insertHistory": "INSERT INTO default:bench.ch2pp.history(KEY, VALUE) VALUES (TO_STRING($6), {\\\"h_c_id\\\":$1, \\\"h_c_d_id\\\":$2, \\\"h_c_w_id\\\":$3, \\\"h_d_id\\\":$4, \\\"h_w_id\\\":$5, \\\"h_date\\\":$6, \\\"h_amount\\\":$7, \\\"h_data\\\":$8})" }, "STOCK_LEVEL": { - "getOId": "SELECT d_next_o_id FROM default:bench.ch2.district WHERE d_w_id = $1 AND d_id = $2", - "getStockCount": " SELECT COUNT(DISTINCT(ol.ol_i_id)) AS cnt_ol_i_id FROM default:bench.ch2.orders o UNNEST o.o_orderline ol INNER JOIN bench.ch2.stock s ON KEYS (TO_STRING(o.o_w_id) || '.' || TO_STRING(ol.ol_i_id)) WHERE o.o_w_id = $1 AND o.o_d_id = $2 AND o.o_id < $3 AND o.o_id >= $4 AND s.s_quantity < $6 " + "getOId": "SELECT d_next_o_id FROM default:bench.ch2pp.district WHERE d_w_id = $1 AND d_id = $2", + "getStockCount": " SELECT COUNT(DISTINCT(ol.ol_i_id)) AS cnt_ol_i_id FROM default:bench.ch2pp.orders o UNNEST o.o_orderline ol INNER JOIN bench.ch2pp.stock s ON KEYS (TO_STRING(o.o_w_id) || '.' || TO_STRING(ol.ol_i_id)) WHERE o.o_w_id = $1 AND o.o_d_id = $2 AND o.o_id < $3 AND o.o_id >= $4 AND s.s_quantity < $6 " }, } -KEYNAMES = { - constants.TABLENAME_ITEM: [0], # INTEGER - constants.TABLENAME_WAREHOUSE: [0], # INTEGER - constants.TABLENAME_DISTRICT: [1, 0], # INTEGER - constants.TABLENAME_CUSTOMER: [2, 1, 0], # INTEGER - constants.TABLENAME_STOCK: [1, 0], # INTEGER - constants.TABLENAME_ORDERS: [3, 2, 0], # INTEGER - constants.TABLENAME_NEWORDER: [2, 1, 0], # INTEGER - constants.TABLENAME_ORDERLINE: [2, 1, 0, 3], # INTEGER - constants.TABLENAME_HISTORY: [2, 1, 0], # INTEGER - constants.TABLENAME_SUPPLIER: [0], # INTEGER - constants.TABLENAME_NATION: [0], # INTEGER - constants.TABLENAME_REGION: [0], # INTEGER -} -CH2_TABLE_COLUMNS = { - constants.TABLENAME_ITEM: [ - "i_id", # INTEGER - "i_name", # VARCHAR - "i_price", # FLOAT - "i_extra", # Extra unused fields - "i_categories", # ARRAY - "i_data", # VARCHAR - "i_im_id", # INTEGER - ], - constants.TABLENAME_WAREHOUSE: [ - "w_id", # SMALLINT - "w_ytd", # FLOAT - "w_tax", # FLOAT - "w_name", # VARCHAR - "w_street_1", # VARCHAR - "w_street_2", # VARCHAR - "w_city", # VARCHAR - "w_state", # VARCHAR - "w_zip", # VARCHAR - ], - constants.TABLENAME_DISTRICT: [ - "d_id", # TINYINT - "d_w_id", # SMALLINT - "d_ytd", # FLOAT - "d_tax", # FLOAT - "d_next_o_id", # INT - "d_name", # VARCHAR - "d_street_1", # VARCHAR - "d_street_2", # VARCHAR - "d_city", # VARCHAR - "d_state", # VARCHAR - "d_zip", # VARCHAR - ], - constants.TABLENAME_CUSTOMER: [ - "c_id", # INTEGER - "c_d_id", # TINYINT - "c_w_id", # SMALLINT - "c_discount", # FLOAT - "c_credit", # VARCHAR - "c_first", # VARCHAR - "c_middle", # VARCHAR - "c_last", # VARCHAR - "c_credit_lim", # FLOAT - "c_balance", # FLOAT - "c_ytd_payment", # FLOAT - "c_payment_cnt", # INTEGER - "c_delivery_cnt", # INTEGER - "c_extra", # Extra unused fields - "c_street_1", # VARCHAR - "c_street_2", # VARCHAR - "c_city", # VARCHAR - "c_state", # VARCHAR - "c_zip", # VARCHAR - "c_phone", # VARCHAR - "c_since", # TIMESTAMP - "c_item_categories", # ARRAY - "c_data", # VARCHAR - ], - constants.TABLENAME_STOCK: [ - "s_i_id", # INTEGER - "s_w_id", # SMALLINT - "s_quantity", # INTEGER - "s_ytd", # INTEGER - "s_order_cnt", # INTEGER - "s_remote_cnt", # INTEGER - "s_data", # VARCHAR - "s_dist_01", # VARCHAR - "s_dist_02", # VARCHAR - "s_dist_03", # VARCHAR - "s_dist_04", # VARCHAR - "s_dist_05", # VARCHAR - "s_dist_06", # VARCHAR - "s_dist_07", # VARCHAR - "s_dist_08", # VARCHAR - "s_dist_09", # VARCHAR - "s_dist_10", # VARCHAR - ], - constants.TABLENAME_ORDERS: [ - "o_id", # INTEGER - "o_c_id", # INTEGER - "o_d_id", # TINYINT - "o_w_id", # SMALLINT - "o_carrier_id", # INTEGER - "o_ol_cnt", # INTEGER - "o_all_local", # INTEGER - "o_entry_d", # TIMESTAMP - "o_extra", # Extra unused fields - "o_orderline", # ARRAY - ], - constants.TABLENAME_NEWORDER: [ - "no_o_id", # INTEGER - "no_d_id", # TINYINT - "no_w_id", # SMALLINT - ], - constants.TABLENAME_ORDERLINE: [ -# "ol_o_id", # INTEGER -# "ol_d_id", # TINYINT -# "ol_w_id", # SMALLINT - "ol_number", # INTEGER - "ol_i_id", # INTEGER - "ol_supply_w_id", # SMALLINT - "ol_delivery_d", # TIMESTAMP - "ol_quantity", # INTEGER - "ol_amount", # FLOAT - "ol_dist_info", # VARCHAR - ], - constants.TABLENAME_HISTORY: [ - "h_c_id", # INTEGER - "h_c_d_id", # TINYINT - "h_c_w_id", # SMALLINT - "h_d_id", # TINYINT - "h_w_id", # SMALLINT - "h_date", # TIMESTAMP - "h_amount", # FLOAT - "h_data", # VARCHAR - ], - constants.TABLENAME_SUPPLIER: [ - "su_suppkey", # INTEGER - "su_name", # VARCHAR - "su_address", # VARCHAR - "su_nationkey", # INTEGER - "su_phone", # VARCHAR - "su_acctbal", # FLOAT - "su_comment", # VARCHAR - ], - constants.TABLENAME_NATION: [ - "n_nationkey", # INTEGER - "n_name", # VARCHAR - "n_regionkey", # INTEGER - "n_comment", # VARCHAR - ], - constants.TABLENAME_REGION: [ - "r_regionkey", # INTEGER - "r_name", # VARCHAR - "r_comment", # VARCHAR - ], -} - -CH2PP_TABLE_COLUMNS = { - constants.TABLENAME_ITEM: [ - "i_id", # INTEGER - "i_name", # VARCHAR - "i_price", # FLOAT - "i_extra", # Extra unused fields - "i_categories", # ARRAY - "i_data", # VARCHAR - "i_im_id", # INTEGER - ], - constants.TABLENAME_WAREHOUSE: [ - "w_id", # SMALLINT - "w_ytd", # FLOAT - "w_tax", # FLOAT - "w_name", # VARCHAR - "w_address", # JSON - ], - constants.TABLENAME_DISTRICT: [ - "d_id", # TINYINT - "d_w_id", # SMALLINT - "d_ytd", # FLOAT - "d_tax", # FLOAT - "d_next_o_id", # INT - "d_name", # VARCHAR - "d_address", # JSON - ], - constants.TABLENAME_CUSTOMER: [ - "c_id", # INTEGER - "c_d_id", # TINYINT - "c_w_id", # SMALLINT - "c_discount", # FLOAT - "c_credit", # VARCHAR - "c_name", # JSON OBJECT - "c_credit_lim", # FLOAT - "c_balance", # FLOAT - "c_ytd_payment", # FLOAT - "c_payment_cnt", # INTEGER - "c_delivery_cnt", # INTEGER - "c_extra", # Extra unused fields - "c_addresses", # ARRAY - "c_phones", # ARRAY - "c_since", # TIMESTAMP - "c_item_categories", # ARRAY - "c_data", # VARCHAR - ], - constants.TABLENAME_STOCK: [ - "s_i_id", # INTEGER - "s_w_id", # SMALLINT - "s_quantity", # INTEGER - "s_ytd", # INTEGER - "s_order_cnt", # INTEGER - "s_remote_cnt", # INTEGER - "s_data", # VARCHAR - "s_dists", # ARRAY - ], - constants.TABLENAME_ORDERS: [ - "o_id", # INTEGER - "o_c_id", # INTEGER - "o_d_id", # TINYINT - "o_w_id", # SMALLINT - "o_carrier_id", # INTEGER - "o_ol_cnt", # INTEGER - "o_all_local", # INTEGER - "o_entry_d", # TIMESTAMP - "o_extra", # Extra unused fields - "o_orderline", # ARRAY - ], - constants.TABLENAME_NEWORDER: [ - "no_o_id", # INTEGER - "no_d_id", # TINYINT - "no_w_id", # SMALLINT - ], - constants.TABLENAME_ORDERLINE: [ -# "ol_o_id", # INTEGER -# "ol_d_id", # TINYINT -# "ol_w_id", # SMALLINT - "ol_number", # INTEGER - "ol_i_id", # INTEGER - "ol_supply_w_id", # SMALLINT - "ol_delivery_d", # TIMESTAMP - "ol_quantity", # INTEGER - "ol_amount", # FLOAT - "ol_dist_info", # VARCHAR - ], - constants.TABLENAME_HISTORY: [ - "h_c_id", # INTEGER - "h_c_d_id", # TINYINT - "h_c_w_id", # SMALLINT - "h_d_id", # TINYINT - "h_w_id", # SMALLINT - "h_date", # TIMESTAMP - "h_amount", # FLOAT - "h_data", # VARCHAR - ], - constants.TABLENAME_SUPPLIER: [ - "su_suppkey", # INTEGER - "su_name", # VARCHAR - "su_address", # JSON - "su_nationkey", # INTEGER - "su_phone", # VARCHAR - "su_acctbal", # FLOAT - "su_comment", # VARCHAR - ], - constants.TABLENAME_NATION: [ - "n_nationkey", # INTEGER - "n_name", # VARCHAR - "n_regionkey", # INTEGER - "n_comment", # VARCHAR - ], - constants.TABLENAME_REGION: [ - "r_regionkey", # INTEGER - "r_name", # VARCHAR - "r_comment", # VARCHAR - ], - constants.TABLENAME_WAREHOUSE_ADDRESS: [ - "w_street_1", # VARCHAR - "w_street_2", # VARCHAR - "w_city", # VARCHAR - "w_state", # VARCHAR - "w_zip", # VARCHAR - ], - constants.TABLENAME_DISTRICT_ADDRESS: [ - "d_street_1", # VARCHAR - "d_street_2", # VARCHAR - "d_city", # VARCHAR - "d_state", # VARCHAR - "d_zip", # VARCHAR - ], - constants.TABLENAME_CUSTOMER_NAME: [ - "c_first", # VARCHAR - "c_middle", # VARCHAR - "c_last", # VARCHAR - ], - constants.TABLENAME_CUSTOMER_ADDRESSES: [ - "c_address_kind", # VARCHAR - "c_street_1", # VARCHAR - "c_street_2", # VARCHAR - "c_city", # VARCHAR - "c_state", # VARCHAR - "c_zip", # VARCHAR - ], - constants.TABLENAME_CUSTOMER_PHONES: [ - "c_phone_kind", # VARCHAR - "c_phone_number", # VARCHAR - ], - constants.TABLENAME_SUPPLIER_ADDRESS: [ - "su_street_1", # VARCHAR - "su_street_2", # VARCHAR - "su_city", # VARCHAR - "su_state", # VARCHAR - "su_zip", # VARCHAR - ], -} - -TABLE_INDEXES = { - constants.TABLENAME_ITEM: [ - "i_id", - ], - constants.TABLENAME_WAREHOUSE: [ - "w_id", - ], - constants.TABLENAME_DISTRICT: [ - "d_id", - "d_w_id", - ], - constants.TABLENAME_CUSTOMER: [ - "c_id", - "c_d_id", - "c_w_id", - ], - constants.TABLENAME_STOCK: [ - "s_i_id", - "s_w_id", - ], - constants.TABLENAME_ORDERS: [ - "o_id", - "o_d_id", - "o_w_id", - "o_c_id", - ], - constants.TABLENAME_NEWORDER: [ - "no_o_id", - "no_d_id", - "no_w_id", - ], - constants.TABLENAME_ORDERLINE: [ - "ol_o_id", - "ol_d_id", - "ol_w_id", - ], - constants.TABLENAME_SUPPLIER: [ - "su_suppkey", - ], - constants.TABLENAME_NATION: [ - "n_nationkey", - ], - constants.TABLENAME_REGION: [ - "r_regionkey", - ], -} - globpool = None gcreds = '[{"user":"' + os.environ["USER_ID"] + '","pass":"' + os.environ["PASSWORD"] + '"}]' @@ -549,7 +188,7 @@ def pysdk_init(self): endpoint = 'couchbases://{}?ssl=no_verify'.format(str_data_node) cluster = Cluster(endpoint, cluster_opts) bucket = cluster.bucket(constants.CH2_BUCKET) - scope = bucket.scope(constants.CH2_SCOPE) + scope = bucket.scope(self.schema) self.collections = {} for tableName in constants.ALL_TABLES: self.collections[tableName] = scope.collection(constants.COLLECTIONS_DICT[tableName]) @@ -623,7 +262,6 @@ def doQueryParam(query, param, txid, randomhost): else: qparam.append(p) stmt['args'] = json.JSONEncoder().encode(qparam) - body = n1ql_load(randomhost, stmt) return retvalN1QLQuery("", body) @@ -692,7 +330,7 @@ def n1ql_load(query_node, stmt): url = "{}{}/query/service".format(protocol, query_node) ## RETRY LOGIC ADDED FOR LOAD - for i in range(NUM_LOAD_RETRIES): + for i in range(constants.NUM_LOAD_RETRIES): try: response = globpool.request('POST', url, fields=stmt, encode_multipart=False) response.read(cache_content=False) @@ -795,21 +433,24 @@ def __init__(self, ddl, clientId, TAFlag="T", txnQueries = CH2PP_TXN_QUERIES for txn, queries in txnQueries.items(): for query, statement in queries.items(): + stmt = statement + if self.schema == constants.CH2_DRIVER_SCHEMA["CH2P"]: + stmt = re.sub("default:bench.ch2pp.", "default:bench.ch2p.", statement) if query == "getStockInfo": for i in range(1,11): if self.schema == constants.CH2_DRIVER_SCHEMA["CH2"]: - converted_district = statement % i + converted_district = stmt % i else: - converted_district = statement - prepare_query = "PREPARE %s_%s_%s " % (txn, i, query) + "FROM %s" % converted_district - stmt = json.loads('{"statement" : "' + str(prepare_query) + '"}') - body = n1ql_execute(self.query_node, stmt) - preparedTransactionQueries[txn + str(i) + query] = body['results'][0]['name'] + converted_district = stmt + prepare_query = "PREPARE %s_%s_%s_%s " % (self.schema, txn, i, query) + "FROM %s" % converted_district + jsonStmt = json.loads('{"statement" : "' + str(prepare_query) + '"}') + body = n1ql_execute(self.query_node, jsonStmt) + preparedTransactionQueries[self.schema + txn + str(i) + query] = body['results'][0]['name'] else: - prepare_query = "PREPARE %s_%s " % (txn, query) + "FROM %s" % statement - stmt = json.loads('{"statement" : "' + str(prepare_query) + '"}') - body = n1ql_execute(self.query_node, stmt) - preparedTransactionQueries[txn + query] = body['results'][0]['name'] + prepare_query = "PREPARE %s_%s_%s " % (self.schema, txn, query) + "FROM %s" % stmt + jsonStmt = json.loads('{"statement" : "' + str(prepare_query) + '"}') + body = n1ql_execute(self.query_node, jsonStmt) + preparedTransactionQueries[self.schema + txn + query] = body['results'][0]['name'] self.prepared_dict = preparedTransactionQueries # wait prepare statements populate other query nodes @@ -831,14 +472,13 @@ def loadConfig(self, config): # Add bucket creation here. For now, simply create manually and load. self.database = "tpcc" self.denormalize = config['denormalize'] - if self.denormalize: logging.debug("Using denormalized data model") return def txStatus(self): return self.tx_status def tryDataSvcBulkLoad(self, collection, cur_batch): - for i in range(NUM_LOAD_RETRIES): + for i in range(constants.NUM_LOAD_RETRIES): try: result = collection.upsert_multi(cur_batch) if result.all_ok == True: @@ -853,11 +493,11 @@ def tryDataSvcBulkLoad(self, collection, cur_batch): logging.debug(f'Exception info: {exc_info[1]}\nTraceback:\n{tb}') time.sleep(1) - logging.debug("Client ID # %d failed bulk load data into KV after %d retries" % (self.client_id, NUM_LOAD_RETRIES)) + logging.debug("Client ID # %d failed bulk load data into KV after %d retries" % (self.client_id, constants.NUM_LOAD_RETRIES)) return False def tryDataSvcLoad(self, collection, key, val): - for i in range(NUM_LOAD_RETRIES): + for i in range(constants.NUM_LOAD_RETRIES): try: result = collection.upsert(key, val) if result.success == True: @@ -872,7 +512,7 @@ def tryDataSvcLoad(self, collection, key, val): logging.debug(f'Exception info: {exc_info[1]}\nTraceback:\n{tb}') time.sleep(1) - logging.debug("Client ID # %d failed load data into KV after %d attempts" % (self.client_id, NUM_LOAD_RETRIES)) + logging.debug("Client ID # %d failed load data into KV after %d attempts" % (self.client_id, constants.NUM_LOAD_RETRIES)) return False ## ---------------------------------------------- @@ -883,7 +523,7 @@ def loadTuples(self, tableName, tuples): return logging.debug("Loading %d tuples for tableName %s" % (len(tuples), tableName)) - assert tableName in CH2_TABLE_COLUMNS, "Unexpected table %s" % tableName + assert tableName in constants.CH2_TABLE_COLUMNS, "Unexpected table %s" % tableName if (self.load_mode == constants.CH2_DRIVER_LOAD_MODE["DATASVC_BULKLOAD"] or self.load_mode == constants.CH2_DRIVER_LOAD_MODE["DATASVC_LOAD"]): @@ -893,7 +533,7 @@ def loadTuples(self, tableName, tuples): cur_batch = {} cur_size = 0 for t in tuples: - key, val = self.getOneDoc(tableName, t, False) + key, val = self.getOneDoc(tableName, t, True) cur_batch[key] = val cur_size += len(key) + len(val) + 24 # 24 bytes of overhead if cur_size > self.bulkload_batch_size: @@ -912,7 +552,7 @@ def loadTuples(self, tableName, tuples): #self.load_mode == constants.CH2_DRIVER_LOAD_MODE["DATASVC_LOAD"] # Load one document at a time for t in tuples: - key, val = self.getOneDoc(tableName, t, False) + key, val = self.getOneDoc(tableName, t, True) result = self.tryDataSvcLoad(collection, key, val) if result == True: continue @@ -922,11 +562,11 @@ def loadTuples(self, tableName, tuples): elif self.load_mode == constants.CH2_DRIVER_LOAD_MODE["QRYSVC_LOAD"]: for t in tuples: args = [] - args.append(tableName) + args.append(constants.CH2_NAMESPACE + ":" + constants.CH2_BUCKET + "." + self.schema + "."+ tableName) args.append("") args.append("") args.append({}) - key, val = self.getOneDoc(tableName, t, False) + key, val = self.getOneDoc(tableName, t, True) args[1] = key args[2] = val @@ -937,148 +577,13 @@ def loadTuples(self, tableName, tuples): return - def getOneDoc(self, tableName, tuple, denorm): - if self.schema == constants.CH2_DRIVER_SCHEMA["CH2"]: - return self.getOneCH2Doc(tableName, tuple, denorm) - else: - return self.getOneCH2PPDoc(tableName, tuple, denorm) - - def getOneCH2Doc(self, tableName, tuple, denorm): - columns = CH2_TABLE_COLUMNS[tableName] - key = "" - if denorm: - for l, k in enumerate(KEYNAMES[tableName]): - if l == 0: - key = str(tuple[columns[k]]) - else: - key = key + '.' + str(tuple[columns[k]]) - val = tuple - else: - for l, k in enumerate(KEYNAMES[tableName]): - if l == 0: - key = str(tuple[k]) - else: - key = key + '.' + str(tuple[k]) - val = {} - for l, v in enumerate(tuple): - v1 = tuple[l] - if tableName == constants.TABLENAME_ORDERS and columns[l] == "o_orderline": - v1 = [] - for olv in v: - v1.append(self.genNestedTuple(olv, constants.TABLENAME_ORDERLINE)) - elif (tableName == constants.TABLENAME_ITEM and columns[l] == "i_categories" or - tableName == constants.TABLENAME_CUSTOMER and columns[l] == "c_item_categories"): - continue - elif tableName == constants.TABLENAME_CUSTOMER and columns[l] == "c_extra": - for i in range(0, self.customerExtraFields): - val[columns[l]+"_"+str(format(i+1, "03d"))] = v1[i] - continue - elif tableName == constants.TABLENAME_ORDERS and columns[l] == "o_extra": - for i in range(0, self.ordersExtraFields): - val[columns[l]+"_"+str(format(i+1, "03d"))] = v1[i] - continue - elif tableName == constants.TABLENAME_ITEM and columns[l] == "i_extra": - for i in range(0, self.itemExtraFields): - val[columns[l]+"_"+str(format(i+1, "03d"))] = v1[i] - continue - elif isinstance(v1,(datetime)): - v1 = str(v1) - val[columns[l]] = v1 - - return key, val - - def getOneCH2PPDoc(self, tableName, tuple, denorm): - columns = CH2PP_TABLE_COLUMNS[tableName] - key = "" - if denorm: - for l, k in enumerate(KEYNAMES[tableName]): - if l == 0: - key = str(tuple[columns[k]]) - else: - key = key + '.' + str(tuple[columns[k]]) - val = tuple - else: - for l, k in enumerate(KEYNAMES[tableName]): - if l == 0: - key = str(tuple[k]) - else: - key = key + '.' + str(tuple[k]) - val = {} - for l, v in enumerate(tuple): - v1 = tuple[l] - if isinstance(v1,(datetime)): - v1 = str(v1) - elif tableName == constants.TABLENAME_ORDERS and columns[l] == "o_orderline": - v1 = [] - for olv in v: - v1.append(self.genNestedTuple(olv, constants.TABLENAME_ORDERLINE)) - elif (self.schema == constants.CH2_DRIVER_SCHEMA["CH2P"] and - (tableName == constants.TABLENAME_ITEM and columns[l] == "i_categories" or - tableName == constants.TABLENAME_CUSTOMER and columns[l] == "c_item_categories")): - continue - elif tableName == constants.TABLENAME_WAREHOUSE and columns[l] == "w_address": - v1 = self.genNestedTuple(v, constants.TABLENAME_WAREHOUSE_ADDRESS) - elif tableName == constants.TABLENAME_DISTRICT and columns[l] == "d_address": - v1 = self.genNestedTuple(v, constants.TABLENAME_DISTRICT_ADDRESS) - elif tableName == constants.TABLENAME_SUPPLIER and columns[l] == "su_address": - v1 = self.genNestedTuple(v, constants.TABLENAME_SUPPLIER_ADDRESS) - elif tableName == constants.TABLENAME_CUSTOMER: - if columns[l] == "c_name": - v1 = self.genNestedTuple(v, constants.TABLENAME_CUSTOMER_NAME) - elif columns[l] == "c_extra": - for i in range(0, self.customerExtraFields): - val[columns[l]+"_"+str(format(i+1, "03d"))] = v1[i] - continue - elif columns[l] == "c_addresses": - v1 = [] - for clv in v: - v1.append(self.genNestedTuple(clv, constants.TABLENAME_CUSTOMER_ADDRESSES)) - if self.schema == constants.CH2_DRIVER_SCHEMA["CH2P"]: - break # Load only one customer address for CH2P - elif columns[l] == "c_phones": - v1 = [] - for clv in v: - v1.append(self.genNestedTuple(clv, constants.TABLENAME_CUSTOMER_PHONES)) - if self.schema == constants.CH2_DRIVER_SCHEMA["CH2P"]: - break # Load only one customer phone for CH2P - elif tableName == constants.TABLENAME_ORDERS and columns[l] == "o_extra": - for i in range(0, self.ordersExtraFields): - val[columns[l]+"_"+str(format(i+1, "03d"))] = v1[i] - continue - elif tableName == constants.TABLENAME_ITEM and columns[l] == "i_extra": - for i in range(0, self.itemExtraFields): - val[columns[l]+"_"+str(format(i+1, "03d"))] = v1[i] - continue - val[columns[l]] = v1 - return key, val - - def genNestedTuple(self, tuple, tableName): - if self.schema == constants.CH2_DRIVER_SCHEMA["CH2"]: - columns = CH2_TABLE_COLUMNS[tableName] - else: - columns = CH2PP_TABLE_COLUMNS[tableName] - rval = {} - for l, v in enumerate(tuple): - if isinstance(v,(datetime)): - v = str(v) - rval[columns[l]] = v - return rval - ## ---------------------------------------------- ## loadFinish ## ---------------------------------------------- def loadFinish(self): logging.info("Client ID # %d Finished loading tables" % (self.client_id)) - if logging.getLogger().isEnabledFor(logging.DEBUG): - for name in constants.ALL_TABLES: - if self.denormalize and name in NestCollectionsDriver.DENORMALIZED_TABLES[1:]: return - #logging.debug("%-12s%d records" % (name+":", self.database[name].count())) - #Nothing to commit for N1QL - return - - ## ---------------------------------------------- ## doDelivery ## ---------------------------------------------- @@ -1094,34 +599,34 @@ def doDelivery(self, params): result = [ ] for d_id in range(1, constants.DISTRICTS_PER_WAREHOUSE+1): - rs1, status = runNQuery("begin", self.prepared_dict[ txn + "beginWork"],"",self.delivery_txtimeout, randomhost) + rs1, status = runNQuery("begin", self.prepared_dict[ self.schema + txn + "beginWork"],"",self.delivery_txtimeout, randomhost) txid = rs1[0]['txid'] - newOrder, status = runNQueryParam(self.prepared_dict[ txn + "getNewOrder"], [d_id, w_id], txid, randomhost) + newOrder, status = runNQueryParam(self.prepared_dict[ self.schema +txn + "getNewOrder"], [d_id, w_id], txid, randomhost) if len(newOrder) == 0: assert len(newOrder) > 0 ## No orders for this district: skip it. Note: This must be reported if > 1% continue no_o_id = newOrder[0]['no_o_id'] - rs, status = runNQueryParam(self.prepared_dict[ txn + "getCId"], [no_o_id, d_id, w_id],txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCId"], [no_o_id, d_id, w_id],txid, randomhost) if (status != "success"): continue c_id = rs[0]['o_c_id'] - rs2,status = runNQueryParam(self.prepared_dict[ txn + "sumOLAmount"], [no_o_id, d_id, w_id], txid, randomhost) + rs2,status = runNQueryParam(self.prepared_dict[self.schema + txn + "sumOLAmount"], [no_o_id, d_id, w_id], txid, randomhost) if (status != "success"): continue ol_total = rs2[0]['sum_ol_amount'] - result,status = runNQueryParam(self.prepared_dict[ txn + "deleteNewOrder"], [d_id, w_id, no_o_id], txid, randomhost) + result,status = runNQueryParam(self.prepared_dict[self.schema + txn + "deleteNewOrder"], [d_id, w_id, no_o_id], txid, randomhost) if (status != "success"): continue - result,status = runNQueryParam(self.prepared_dict[ txn + "updateOrders"], [o_carrier_id, no_o_id, d_id, w_id], txid, randomhost) + result,status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateOrders"], [o_carrier_id, no_o_id, d_id, w_id], txid, randomhost) if (status != "success"): continue - result,status = runNQueryParam(self.prepared_dict[ txn + "updateOrderLine"], [ol_delivery_d, no_o_id, d_id, w_id], txid, randomhost) + result,status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateOrderLine"], [ol_delivery_d, no_o_id, d_id, w_id], txid, randomhost) if (status != "success"): continue @@ -1132,12 +637,12 @@ def doDelivery(self, params): # assert ol_total != None, "ol_total is NULL: there are no order lines. This should not happen" # assert ol_total > 0.0 - result,status = runNQueryParam(self.prepared_dict[ txn + "updateCustomer"], [ol_total, c_id, d_id, w_id], txid, randomhost) + result,status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateCustomer"], [ol_total, c_id, d_id, w_id], txid, randomhost) if (status != "success"): continue result.append((d_id, no_o_id)) - trs, self.tx_status = runNQuery("commit", self.prepared_dict[ txn + "commitWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("commit", self.prepared_dict[self.schema + txn + "commitWork"],txid,"",randomhost) ## FOR return result @@ -1168,15 +673,15 @@ def doNewOrder(self, params): all_local = True items = [ ] - rs, tstatus = runNQuery("begin", self.prepared_dict[ txn + "beginWork"],"",self.txtimeout, randomhost) + rs, tstatus = runNQuery("begin", self.prepared_dict[self.schema + txn + "beginWork"],"",self.txtimeout, randomhost) txid = rs[0]['txid'] #print txid for i in range(len(i_ids)): ## Determine if this is an all local order or not all_local = all_local and i_w_ids[i] == w_id - rs, status = runNQueryParam(self.prepared_dict[ txn + "getItemInfo"], [i_ids[i]], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "getItemInfo"], [i_ids[i]], txid, randomhost) if len(rs) == 0: - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert len(rs) > 0 @@ -1188,27 +693,27 @@ def doNewOrder(self, params): for item in items: if len(item) == 0: ## TODO Abort here! - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return ## FOR ## ---------------- ## Collect Information from WAREHOUSE, DISTRICT, and CUSTOMER ## ---------------- - rs, status = runNQueryParam(self.prepared_dict[ txn + "getWarehouseTaxRate"], [w_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "getWarehouseTaxRate"], [w_id], txid, randomhost) customer_info = rs if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return if len(rs) > 0: w_tax = rs[0]['w_tax'] - district_info, status = runNQueryParam(self.prepared_dict[ txn +"getDistrict"], [d_id, w_id], txid, randomhost) + district_info, status = runNQueryParam(self.prepared_dict[self.schema + txn +"getDistrict"], [d_id, w_id], txid, randomhost) if len(district_info) != 0: d_tax = district_info[0]['d_tax'] d_next_o_id = district_info[0]['d_next_o_id'] - rs, status = runNQueryParam(self.prepared_dict[ txn + "getCustomer"], [w_id, d_id, c_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCustomer"], [w_id, d_id, c_id], txid, randomhost) if len(rs) != 0: c_discount = rs[0]['c_discount'] @@ -1218,19 +723,19 @@ def doNewOrder(self, params): ol_cnt = len(i_ids) o_carrier_id = constants.NULL_CARRIER_ID - rs, status = runNQueryParam(self.prepared_dict[ txn + "incrementNextOrderId"], [d_next_o_id + 1, d_id, w_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "incrementNextOrderId"], [d_next_o_id + 1, d_id, w_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return - rs, status = runNQueryParam(self.prepared_dict[ txn + "createOrder"], [d_next_o_id, d_id, w_id, c_id, o_entry_d, o_carrier_id, ol_cnt, all_local], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "createOrder"], [d_next_o_id, d_id, w_id, c_id, o_entry_d, o_carrier_id, ol_cnt, all_local], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return - rs,status = runNQueryParam(self.prepared_dict[ txn + "createNewOrder"], [d_next_o_id, d_id, w_id], txid, randomhost) + rs,status = runNQueryParam(self.prepared_dict[self.schema + txn + "createNewOrder"], [d_next_o_id, d_id, w_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return #print "NewOrder Stage #1" @@ -1254,9 +759,9 @@ def doNewOrder(self, params): i_price = itemInfo["i_price"] # print "NewOrder Stage #3" - stockInfo, status = runNQueryParam(self.prepared_dict[ txn + str(d_id) + "getStockInfo"], [ol_i_id, ol_supply_w_id], txid, randomhost) + stockInfo, status = runNQueryParam(self.prepared_dict[self.schema + txn + str(d_id) + "getStockInfo"], [ol_i_id, ol_supply_w_id], txid, randomhost) if len(stockInfo) == 0: - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) logging.warn("No STOCK record for (ol_i_id=%d, ol_supply_w_id=%d)" % (ol_i_id, ol_supply_w_id)) return @@ -1289,9 +794,9 @@ def doNewOrder(self, params): if ol_supply_w_id != w_id: s_remote_cnt += 1 # print "NewOrder Stage #5" - rs, status = runNQueryParam(self.prepared_dict[ txn + "updateStock"], [s_quantity, s_ytd, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateStock"], [s_quantity, s_ytd, s_order_cnt, s_remote_cnt, ol_i_id, ol_supply_w_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return if i_data.find(constants.ORIGINAL_STRING) != -1 and s_data.find(constants.ORIGINAL_STRING) != -1: @@ -1303,13 +808,13 @@ def doNewOrder(self, params): ol_amount = ol_quantity * i_price total += ol_amount - rs, status = runNQueryParam(self.prepared_dict[ txn + "createOrderLine"], [d_next_o_id, d_id, w_id, ol_number, ol_i_id, ol_supply_w_id, o_entry_d, ol_quantity, ol_amount, s_dist_xx], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "createOrderLine"], [d_next_o_id, d_id, w_id, ol_number, ol_i_id, ol_supply_w_id, o_entry_d, ol_quantity, ol_amount, s_dist_xx], txid, randomhost) ## Add the info to be returned item_data.append( (i_name, s_quantity, brand_generic, i_price, ol_amount) ) ## FOR - trs, self.tx_status = runNQuery("commit", self.prepared_dict[ txn + "commitWork"], txid, "",randomhost) + trs, self.tx_status = runNQuery("commit", self.prepared_dict[self.schema + txn + "commitWork"], txid, "",randomhost) ## Adjust the total for the discount #print "c_discount:", c_discount, type(c_discount) @@ -1341,21 +846,21 @@ def doOrderStatus(self, params): assert w_id, pformat(params) assert d_id, pformat(params) - rs, tstatus = runNQuery("begin", self.prepared_dict[ txn + "beginWork"],"",self.txtimeout, randomhost) + rs, tstatus = runNQuery("begin", self.prepared_dict[self.schema + txn + "beginWork"],"",self.txtimeout, randomhost) txid = rs[0]['txid'] if c_id != None: - customerlist,status = runNQueryParam(self.prepared_dict[ txn + "getCustomerByCustomerId"], [w_id, d_id, c_id], txid, randomhost) + customerlist,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCustomerByCustomerId"], [w_id, d_id, c_id], txid, randomhost) if len(customerlist) == 0 : - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert len(customerlist) > 0 customer = customerlist[0] else: # Get the midpoint customer's id - all_customers,status = runNQueryParam(self.prepared_dict[ txn + "getCustomersByLastName"], [w_id, d_id, c_last], txid, randomhost) + all_customers,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCustomersByLastName"], [w_id, d_id, c_last], txid, randomhost) if len(all_customers) == 0 : - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert len(all_customers) > 0 @@ -1365,17 +870,17 @@ def doOrderStatus(self, params): c_id = customer['c_id'] if len(customer) == 0 or c_id == None : - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert (len(customer) > 0 or c_id != None) - order,status = runNQueryParam(self.prepared_dict[ txn + "getLastOrder"], [w_id, d_id, c_id], txid, randomhost) + order,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getLastOrder"], [w_id, d_id, c_id], txid, randomhost) if len(order) > 0: - orderLines,status = runNQueryParam(self.prepared_dict[ txn + "getOrderLines"], [w_id, d_id, order[0]['o_id']], txid, randomhost) + orderLines,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getOrderLines"], [w_id, d_id, order[0]['o_id']], txid, randomhost) else: orderLines = [ ] - trs, self.tx_status = runNQuery("commit", self.prepared_dict[ txn + "commitWork"], txid, "",randomhost) + trs, self.tx_status = runNQuery("commit", self.prepared_dict[self.schema + txn + "commitWork"], txid, "",randomhost) #Keshav: self.conn.commit() return [ customer, order, orderLines ] @@ -1399,13 +904,13 @@ def doPayment(self, params): c_last = params["c_last"] h_date = params["h_date"] - rs, tstatus = runNQuery("begin", self.prepared_dict[ txn + "beginWork"],"",self.txtimeout, randomhost) + rs, tstatus = runNQuery("begin", self.prepared_dict[self.schema + txn + "beginWork"],"",self.txtimeout, randomhost) txid = rs[0]['txid'] if c_id != None: - customerlist,status = runNQueryParam(self.prepared_dict[ txn + "getCustomerByCustomerId"], [w_id, d_id, c_id], txid, randomhost) + customerlist,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCustomerByCustomerId"], [w_id, d_id, c_id], txid, randomhost) if len(customerlist) == 0 : - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert len(customerlist) > 0 @@ -1413,9 +918,9 @@ def doPayment(self, params): customer = customerlist[0] else: # Get the midpoint customer's id - all_customers,status = runNQueryParam(self.prepared_dict[ txn + "getCustomersByLastName"], [w_id, d_id, c_last], txid, randomhost) + all_customers,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCustomersByLastName"], [w_id, d_id, c_last], txid, randomhost) if len(all_customers) == 0 : - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert len(all_customers) > 0 @@ -1426,7 +931,7 @@ def doPayment(self, params): c_id = customer['c_id'] if len(customer) == 0 or c_id == None : - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) self.tx_status = "assert" return assert (len(customer) > 0 or c_id != None) @@ -1438,24 +943,24 @@ def doPayment(self, params): #print "doPayment: Stage 2" - warehouse,status = runNQueryParam(self.prepared_dict[ txn + "getWarehouse"], [w_id], txid, randomhost) + warehouse,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getWarehouse"], [w_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return - district,status = runNQueryParam(self.prepared_dict[ txn + "getDistrict"], [w_id, d_id], txid, randomhost) + district,status = runNQueryParam(self.prepared_dict[self.schema + txn + "getDistrict"], [w_id, d_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return - rs, status = runNQueryParam(self.prepared_dict[ txn + "updateWarehouseBalance"], [h_amount, w_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateWarehouseBalance"], [h_amount, w_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return - rs, status = runNQueryParam(self.prepared_dict[ txn + "updateDistrictBalance"], [h_amount, w_id, d_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateDistrictBalance"], [h_amount, w_id, d_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return #print "doPayment: Stage3" @@ -1465,15 +970,15 @@ def doPayment(self, params): newData = " ".join(map(str, [c_id, c_d_id, c_w_id, d_id, w_id, h_amount])) c_data = (newData + "|" + c_data) if len(c_data) > constants.MAX_C_DATA: c_data = c_data[:constants.MAX_C_DATA] - rs, status = runNQueryParam(self.prepared_dict[ txn + "updateBCCustomer"], [c_balance, c_ytd_payment, c_payment_cnt, c_data, c_w_id, c_d_id, c_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateBCCustomer"], [c_balance, c_ytd_payment, c_payment_cnt, c_data, c_w_id, c_d_id, c_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return else: c_data = "" - rs, status = runNQueryParam(self.prepared_dict[ txn + "updateGCCustomer"], [c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "updateGCCustomer"], [c_balance, c_ytd_payment, c_payment_cnt, c_w_id, c_d_id, c_id], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return #print "doPayment: Stage4" @@ -1482,12 +987,12 @@ def doPayment(self, params): # print "district %s" % (str(district)) h_data = "%s %s" % (warehouse[0]['w_name'], district[0]['d_name']) # Create the history record - rs, status = runNQueryParam(self.prepared_dict[ txn + "insertHistory"], [c_id, c_d_id, c_w_id, d_id, w_id, h_date, h_amount, h_data], txid, randomhost) + rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "insertHistory"], [c_id, c_d_id, c_w_id, d_id, w_id, h_date, h_amount, h_data], txid, randomhost) if (status != "success"): - trs, self.tx_status = runNQuery("rollback", self.prepared_dict[ txn + "rollbackWork"],txid,"",randomhost) + trs, self.tx_status = runNQuery("rollback", self.prepared_dict[self.schema + txn + "rollbackWork"],txid,"",randomhost) return - trs, self.tx_status = runNQuery("commit", self.prepared_dict[ txn + "commitWork"], txid,"",randomhost) + trs, self.tx_status = runNQuery("commit", self.prepared_dict[self.schema + txn + "commitWork"], txid,"",randomhost) #Keshav: self.conn.commit() # TPC-C 2.5.3.3: Must display the following fields: @@ -1517,17 +1022,17 @@ def doStockLevel(self, params): #rs = runNQuery("BEGIN WORK","",self.stock_txtimeout, randomhost) #txid = rs[0]['txid'] - result, self.tx_status = runNQueryParam(self.prepared_dict[ txn + "getOId"], [w_id, d_id],"", randomhost) + result, self.tx_status = runNQueryParam(self.prepared_dict[self.schema + txn + "getOId"], [w_id, d_id],"", randomhost) assert result o_id = result[0]['d_next_o_id'] - result, self.tx_status = runNQueryParam(self.prepared_dict[ txn + "getStockCount"], [w_id, d_id, o_id, (o_id - 20), w_id, threshold], "", randomhost) + result, self.tx_status = runNQueryParam(self.prepared_dict[self.schema + txn + "getStockCount"], [w_id, d_id, o_id, (o_id - 20), w_id, threshold], "", randomhost) #self.conn.commit() - #rs, status = runNQueryParam(self.prepared_dict[ txn + "getCustomerOrdersByDistrict"], [d_id], "", randomhost) - #rs, status = runNQueryParam(self.prepared_dict[ txn + "getOrdersByDistrict"], [d_id], "", randomhost) + #rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "getCustomerOrdersByDistrict"], [d_id], "", randomhost) + #rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + "getOrdersByDistrict"], [d_id], "", randomhost) #Taking too long with 10Warehouses.So disabling - #rs, status = runNQueryParam(self.prepared_dict[ txn + 'ansigetStockCount'], [w_id, d_id, o_id, (o_id - 20), w_id, threshold], txid, randomhost) + #rs, status = runNQueryParam(self.prepared_dict[self.schema + txn + 'ansigetStockCount'], [w_id, d_id, o_id, (o_id - 20), w_id, threshold], txid, randomhost) #runNQuery("COMMIT WORK", txid, "", randomhost) return int(result[0]['cnt_ol_i_id']) @@ -1570,6 +1075,8 @@ def runCH2Queries(self, duration, endBenchmarkTime, queryIterNum): start = time.time() startTime = time.strftime("%H:%M:%S", time.localtime(start)) + stTime = int(time.time() * 1000) + stTime = int(time.time_ns() / 1000000) # In benchmark run mode, if the duration has elapsed, stop executing queries if duration is not None: @@ -1577,8 +1084,10 @@ def runCH2Queries(self, duration, endBenchmarkTime, queryIterNum): logging.debug("%s started at: %s (started after the duration of the benchmark)" % (query_id_str, startTime)) break - logging.info("%s started at: %s" % (query_id_str, startTime)) + logging.info("%s started at: %s %d" % (query_id_str, startTime, stTime)) body = n1ql_execute(self.analytics_node, stmt, 0) + eTime = int(time.time() * 1000) + eTime = int(time.time_ns() / 1000000) end = time.time() endTime = time.strftime("%H:%M:%S", time.localtime(end)) @@ -1588,7 +1097,7 @@ def runCH2Queries(self, duration, endBenchmarkTime, queryIterNum): logging.debug("%s ended at: %s (ended after the duration of the benchmark)" % (query_id_str, endTime)) break - logging.info("%s ended at: %s" % (query_id_str, endTime)) + logging.info("%s ended at: %s %d" % (query_id_str, endTime, eTime)) logging.info("%s metrics: %s" % (query_id_str, body.get("metrics"))) qry_times[qry] = [ @@ -1601,3 +1110,6 @@ def runCH2Queries(self, duration, endBenchmarkTime, queryIterNum): return qry_times ## CLASS + + +