Skip to content

Commit

Permalink
Add support for cross bucket replication feature (#12562)
Browse files Browse the repository at this point in the history
[upstream:bcbb0985ec7afaee4687d61e685cf7e92ba30cdc]

Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
modular-magician committed Dec 26, 2024
1 parent 9e4e7bb commit d4194e5
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .changelog/12562.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
storagetransfer: add support for `replication_spec` fields to `google_storage_transfer_job` resource
```
138 changes: 125 additions & 13 deletions google/services/storagetransfer/resource_storage_transfer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,35 @@ import (
)

var (
objectConditionsKeys = []string{
transferSpecObjectConditionsKeys = []string{
"transfer_spec.0.object_conditions.0.min_time_elapsed_since_last_modification",
"transfer_spec.0.object_conditions.0.max_time_elapsed_since_last_modification",
"transfer_spec.0.object_conditions.0.include_prefixes",
"transfer_spec.0.object_conditions.0.exclude_prefixes",
"transfer_spec.0.object_conditions.0.last_modified_since",
"transfer_spec.0.object_conditions.0.last_modified_before",
}
replicationSpecObjectConditionsKeys = []string{
"replication_spec.0.object_conditions.0.min_time_elapsed_since_last_modification",
"replication_spec.0.object_conditions.0.max_time_elapsed_since_last_modification",
"replication_spec.0.object_conditions.0.include_prefixes",
"replication_spec.0.object_conditions.0.exclude_prefixes",
"replication_spec.0.object_conditions.0.last_modified_since",
"replication_spec.0.object_conditions.0.last_modified_before",
}

transferOptionsKeys = []string{
transferSpecTransferOptionsKeys = []string{
"transfer_spec.0.transfer_options.0.overwrite_objects_already_existing_in_sink",
"transfer_spec.0.transfer_options.0.delete_objects_unique_in_sink",
"transfer_spec.0.transfer_options.0.delete_objects_from_source_after_transfer",
"transfer_spec.0.transfer_options.0.overwrite_when",
}
replicationSpecTransferOptionsKeys = []string{
"replication_spec.0.transfer_options.0.overwrite_objects_already_existing_in_sink",
"replication_spec.0.transfer_options.0.delete_objects_unique_in_sink",
"replication_spec.0.transfer_options.0.delete_objects_from_source_after_transfer",
"replication_spec.0.transfer_options.0.overwrite_when",
}

transferSpecDataSourceKeys = []string{
"transfer_spec.0.gcs_data_source",
Expand All @@ -49,6 +63,14 @@ var (
"transfer_spec.0.gcs_data_sink",
"transfer_spec.0.posix_data_sink",
}

replicationSpecDataSourceKeys = []string{
"replication_spec.0.gcs_data_source",
}
replicationSpecDataSinkKeys = []string{
"replication_spec.0.gcs_data_sink",
}

awsS3AuthKeys = []string{
"transfer_spec.0.aws_s3_data_source.0.aws_access_key",
"transfer_spec.0.aws_s3_data_source.0.role_arn",
Expand Down Expand Up @@ -90,10 +112,11 @@ func ResourceStorageTransferJob() *schema.Resource {
Description: `The project in which the resource belongs. If it is not provided, the provider project is used.`,
},
"event_stream": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"schedule"},
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"schedule"},
DiffSuppressFunc: diffSuppressEventStream,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": {
Expand All @@ -116,14 +139,46 @@ func ResourceStorageTransferJob() *schema.Resource {
},
},
},
"replication_spec": {
Type: schema.TypeList,
MaxItems: 1,
Optional: true,
ConflictsWith: []string{"transfer_spec", "schedule"},
ExactlyOneOf: []string{"transfer_spec", "replication_spec"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"object_conditions": objectConditionsSchema(replicationSpecObjectConditionsKeys),
"transfer_options": transferOptionsSchema(replicationSpecTransferOptionsKeys),
"gcs_data_sink": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: gcsDataSchema(),
ExactlyOneOf: replicationSpecDataSinkKeys,
Description: `A Google Cloud Storage data sink.`,
},
"gcs_data_source": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: gcsDataSchema(),
ExactlyOneOf: replicationSpecDataSourceKeys,
Description: `A Google Cloud Storage data source.`,
},
},
},
Description: `Replication specification.`,
},
"transfer_spec": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"replication_spec"},
ExactlyOneOf: []string{"transfer_spec", "replication_spec"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"object_conditions": objectConditionsSchema(),
"transfer_options": transferOptionsSchema(),
"object_conditions": objectConditionsSchema(transferSpecObjectConditionsKeys),
"transfer_options": transferOptionsSchema(transferSpecTransferOptionsKeys),
"source_agent_pool_name": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -303,7 +358,7 @@ func ResourceStorageTransferJob() *schema.Resource {
}
}

func objectConditionsSchema() *schema.Schema {
func objectConditionsSchema(objectConditionsKeys []string) *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -364,7 +419,7 @@ func objectConditionsSchema() *schema.Schema {
}
}

func transferOptionsSchema() *schema.Schema {
func transferOptionsSchema(transferOptionsKeys []string) *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -625,6 +680,7 @@ func resourceStorageTransferJobCreate(d *schema.ResourceData, meta interface{})
Schedule: expandTransferSchedules(d.Get("schedule").([]interface{})),
EventStream: expandEventStream(d.Get("event_stream").([]interface{})),
TransferSpec: expandTransferSpecs(d.Get("transfer_spec").([]interface{})),
ReplicationSpec: expandReplicationSpecs(d.Get("replication_spec").([]interface{})),
NotificationConfig: expandTransferJobNotificationConfig(d.Get("notification_config").([]interface{})),
}

Expand Down Expand Up @@ -709,6 +765,11 @@ func resourceStorageTransferJobRead(d *schema.ResourceData, meta interface{}) er
return err
}

err = d.Set("replication_spec", flattenReplicationSpec(res.ReplicationSpec))
if err != nil {
return err
}

err = d.Set("notification_config", flattenTransferJobNotificationConfig(res.NotificationConfig))
if err != nil {
return err
Expand Down Expand Up @@ -767,6 +828,13 @@ func resourceStorageTransferJobUpdate(d *schema.ResourceData, meta interface{})
}
}

if d.HasChange("replication_spec") {
fieldMask = append(fieldMask, "replication_spec")
if v, ok := d.GetOk("replication_spec"); ok {
transferJob.ReplicationSpec = expandReplicationSpecs(v.([]interface{}))
}
}

if d.HasChange("notification_config") {
fieldMask = append(fieldMask, "notification_config")
if v, ok := d.GetOk("notification_config"); ok {
Expand Down Expand Up @@ -1249,6 +1317,9 @@ func expandTransferSpecs(transferSpecs []interface{}) *storagetransfer.TransferS
}

func flattenTransferSpec(transferSpec *storagetransfer.TransferSpec, d *schema.ResourceData) []map[string]interface{} {
if transferSpec == nil || reflect.DeepEqual(transferSpec, &storagetransfer.TransferSpec{}) {
return nil
}

data := map[string]interface{}{}

Expand Down Expand Up @@ -1326,3 +1397,44 @@ func flattenTransferJobNotificationConfig(notificationConfig *storagetransfer.No

return []map[string]interface{}{data}
}

func diffSuppressEventStream(k, old, new string, d *schema.ResourceData) bool {
// Check if it's a replication job.
_, is_replication := d.GetOk("replication_spec")
return is_replication
}

func expandReplicationSpecs(replicationSpecs []interface{}) *storagetransfer.ReplicationSpec {
if len(replicationSpecs) == 0 || replicationSpecs[0] == nil {
return nil
}

replicationSpec := replicationSpecs[0].(map[string]interface{})
return &storagetransfer.ReplicationSpec{
GcsDataSink: expandGcsData(replicationSpec["gcs_data_sink"].([]interface{})),
ObjectConditions: expandObjectConditions(replicationSpec["object_conditions"].([]interface{})),
TransferOptions: expandTransferOptions(replicationSpec["transfer_options"].([]interface{})),
GcsDataSource: expandGcsData(replicationSpec["gcs_data_source"].([]interface{})),
}
}

func flattenReplicationSpec(replicationSpec *storagetransfer.ReplicationSpec) []map[string]interface{} {
if replicationSpec == nil || reflect.DeepEqual(replicationSpec, &storagetransfer.ReplicationSpec{}) {
return nil
}

data := map[string]interface{}{}
if replicationSpec.GcsDataSink != nil {
data["gcs_data_sink"] = flattenGcsData(replicationSpec.GcsDataSink)
}
if replicationSpec.GcsDataSource != nil {
data["gcs_data_source"] = flattenGcsData(replicationSpec.GcsDataSource)
}
if replicationSpec.ObjectConditions != nil {
data["object_conditions"] = flattenObjectCondition(replicationSpec.ObjectConditions)
}
if replicationSpec.TransferOptions != nil {
data["transfer_options"] = flattenTransferOption(replicationSpec.TransferOptions)
}
return []map[string]interface{}{data}
}
Loading

0 comments on commit d4194e5

Please sign in to comment.